From f47daa325696473be01531cf0c33d4535baf568a Mon Sep 17 00:00:00 2001 From: Jon Michael Aanes Date: Sun, 13 Oct 2024 15:04:18 +0200 Subject: [PATCH] Merging git_time_tracker's advanced CSV parsing into personal_data --- git_time_tracker/source/csv_file.py | 67 +----------- personal_data/csv_import.py | 153 ++++++++++++++++++++++++++++ personal_data/util.py | 77 ++------------ test/test_csv_import.py | 16 +++ test/test_parsing.py | 10 +- 5 files changed, 188 insertions(+), 135 deletions(-) create mode 100644 personal_data/csv_import.py create mode 100644 test/test_csv_import.py diff --git a/git_time_tracker/source/csv_file.py b/git_time_tracker/source/csv_file.py index ee66f2e..0130dc2 100644 --- a/git_time_tracker/source/csv_file.py +++ b/git_time_tracker/source/csv_file.py @@ -6,72 +6,10 @@ from decimal import Decimal from pathlib import Path import dataclasses -from personal_data.util import load_csv_file +from personal_data.csv_import import load_csv_file, start_end, determine_possible_keys from ..data import WorkSample -@dataclasses.dataclass -class PossibleKeys: - time_start: list[str] - time_end: list[str] - duration: list[str] - name: list[str] - image: list[str] - misc: list[str] - -def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys: - # Select data - time_keys = [ - k for k, v in event_data.items() if isinstance(v, datetime.date) - ] - duration_keys = [ - k - for k, v in event_data.items() - if isinstance(v, Decimal) and 'duration_seconds' in k - ] - name_keys = [k for k, v in event_data.items() if isinstance(v, str)] - image_keys = [ - k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult) - ] - - misc_keys = list(event_data.keys()) - for k in image_keys: - if k in misc_keys: - misc_keys.remove(k) - del k - for k in time_keys: - if k in misc_keys: - misc_keys.remove(k) - del k - - time_start_keys = [k for k in time_keys if 'start' in k.lower() ] - time_end_keys = [k for k in time_keys if 'end' in k.lower() or 'stop' in k.lower() ] - - return PossibleKeys( - time_start = time_start_keys, - time_end = time_end_keys, - duration = duration_keys, - name = name_keys, - image = image_keys, - misc = misc_keys, - ) - -def start_end(sample: dict[str,Any], keys: PossibleKeys) -> tuple[datetime.datetime | None, datetime.datetime | None]: - if keys.time_start and keys.time_end: - return (sample[keys.time_start[0]], sample[keys.time_end[0]]) - - if keys.time_start and keys.duration: - start = sample[keys.time_start[0]] - duration = datetime.timedelta(seconds=float(sample[keys.duration[0]])) - return (start, start + duration) - - if keys.time_start: - start = sample[keys.time_start[0]] - return (start, None) - if keys.time_end: - return (None, sample[keys.time_end[0]]) - return (None, None) - def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample]: assert len(rows) > 0 max_title_parts = 2 @@ -79,9 +17,11 @@ def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample if True: event_data = rows[len(rows)//2] # Hopefully select a useful representative. + print(event_data) possible_keys = determine_possible_keys(event_data) del event_data + print(possible_keys) assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1 assert len(possible_keys.image) >= 0 @@ -94,7 +34,6 @@ def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample image = event_data[possible_keys.image[0]] if possible_keys.image else None ''' - (start_at, end_at) = start_end(event_data, possible_keys) labels = [f'{k}:{event_data[k]}' for k in possible_keys.misc] diff --git a/personal_data/csv_import.py b/personal_data/csv_import.py new file mode 100644 index 0000000..4d4c6b9 --- /dev/null +++ b/personal_data/csv_import.py @@ -0,0 +1,153 @@ +import datetime +import urllib.parse +from typing import Any +from collections.abc import Iterator +from decimal import Decimal +from pathlib import Path +import dataclasses +import _csv +import csv +import datetime +import decimal +import io +import logging +import typing +import urllib.parse +from collections.abc import Callable, Iterable, Mapping, Sequence +from decimal import Decimal +from pathlib import Path + +from frozendict import frozendict + +CSV_DIALECT = 'one_true_dialect' +csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True) + +T = typing.TypeVar('T') + + +def try_value(fn: Callable[[str], T], s: str) -> T | None: + try: + return fn(s) + except (ValueError, decimal.InvalidOperation): + return None + + +def csv_str_to_value( + s: str, +) -> ( + str + | Decimal + | datetime.date + | datetime.datetime + | urllib.parse.ParseResult + | bool + | None +): + assert not isinstance(s, list) # TODO? + + if s is None: + return None + s = s.strip() + if len(s) == 0: + return None + if (v_decimal := try_value(Decimal, s)) is not None: + return v_decimal + if (v_date := try_value(datetime.date.fromisoformat, s)) is not None: + return v_date + if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None: + return v_datetime + if s.startswith(('http://', 'https://')): + return urllib.parse.urlparse(s) + if s.lower() == 'false': + return False + if s.lower() == 'true': + return True + if s.lower() == 'none': + return None + return s + + + +def load_csv_file(csv_file: Path, sniff=False) -> list[frozendict[str, typing.Any]]: + dicts: list[frozendict] = [] + with open(csv_file) as csvfile: + if sniff: + dialect = csv.Sniffer().sniff(csvfile.read(1024)) + csvfile.seek(0) + else: + dialect = CSV_DIALECT + reader = csv.DictReader(csvfile, dialect=dialect) + for row in reader: + for k in list(row.keys()): + orig = row[k] + row[k] = csv_str_to_value(orig) + if row[k] is None: + del row[k] + del k, orig + dicts.append(frozendict(row)) + del row + del csvfile + return dicts + + +@dataclasses.dataclass +class PossibleKeys: + time_start: list[str] + time_end: list[str] + duration: list[str] + name: list[str] + image: list[str] + misc: list[str] + +def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys: + # Select data + time_keys = [ + k for k, v in event_data.items() if isinstance(v, datetime.date) + ] + duration_keys = [ + k + for k, v in event_data.items() + if isinstance(v, Decimal) and 'duration_seconds' in k + ] + name_keys = [k for k, v in event_data.items() if isinstance(v, str)] + image_keys = [ + k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult) + ] + + misc_keys = list(event_data.keys()) + for k in image_keys: + if k in misc_keys: + misc_keys.remove(k) + del k + for k in time_keys: + if k in misc_keys: + misc_keys.remove(k) + del k + + time_start_keys = [k for k in time_keys if 'start' in k.lower() ] + time_end_keys = [k for k in time_keys if 'end' in k.lower() or 'stop' in k.lower() or 'last' in k.lower() ] + + return PossibleKeys( + time_start = time_start_keys, + time_end = time_end_keys, + duration = duration_keys, + name = name_keys, + image = image_keys, + misc = misc_keys, + ) + +def start_end(sample: dict[str,Any], keys: PossibleKeys) -> tuple[datetime.datetime | None, datetime.datetime | None]: + if keys.time_start and keys.time_end: + return (sample[keys.time_start[0]], sample[keys.time_end[0]]) + + if keys.time_start and keys.duration: + start = sample[keys.time_start[0]] + duration = datetime.timedelta(seconds=float(sample[keys.duration[0]])) + return (start, start + duration) + + if keys.time_start: + start = sample[keys.time_start[0]] + return (start, None) + if keys.time_end: + return (None, sample[keys.time_end[0]]) + return (None, None) diff --git a/personal_data/util.py b/personal_data/util.py index 2c79f50..5a28a12 100644 --- a/personal_data/util.py +++ b/personal_data/util.py @@ -3,6 +3,7 @@ import csv import datetime import decimal import io +from typing import Any import logging import typing import urllib.parse @@ -12,57 +13,12 @@ from pathlib import Path from frozendict import frozendict -from . import data +from . import data, csv_import logger = logging.getLogger(__name__) -CSV_DIALECT = 'one_true_dialect' -csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True) -T = typing.TypeVar('T') - - -def try_value(fn: Callable[[str], T], s: str) -> T | None: - try: - return fn(s) - except (ValueError, decimal.InvalidOperation): - return None - - -def csv_str_to_value( - s: str, -) -> ( - str - | Decimal - | datetime.date - | datetime.datetime - | urllib.parse.ParseResult - | bool - | None -): - if s is None: - return None - s = s.strip() - if len(s) == 0: - return None - if (v_decimal := try_value(Decimal, s)) is not None: - return v_decimal - if (v_date := try_value(datetime.date.fromisoformat, s)) is not None: - return v_date - if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None: - return v_datetime - if s.startswith(('http://', 'https://')): - return urllib.parse.urlparse(s) - if s.lower() == 'false': - return False - if s.lower() == 'true': - return True - if s.lower() == 'none': - return None - return s - - -def csv_safe_value(v: object) -> str: +def csv_safe_value(v: Any) -> str: if isinstance(v, urllib.parse.ParseResult): return v.geturl() if isinstance(v, datetime.datetime): @@ -145,32 +101,13 @@ def deduplicate_dicts( def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]: return frozendict( { - k: csv_str_to_value(str(v)) + k: csv_import.csv_str_to_value(str(v)) for k, v in d.items() - if csv_str_to_value(str(v)) is not None + if csv_import.csv_str_to_value(str(v)) is not None }, ) -def load_csv_file(csv_file: Path) -> list[frozendict[str, typing.Any]]: - dicts: list[frozendict] = [] - with open(csv_file) as csvfile: - dialect = csv.Sniffer().sniff(csvfile.read(1024)) - csvfile.seek(0) - reader = csv.DictReader(csvfile, dialect=dialect) - for row in reader: - for k in list(row.keys()): - orig = row[k] - row[k] = csv_str_to_value(orig) - if row[k] is None: - del row[k] - del k, orig - dicts.append(frozendict(row)) - del row - del csvfile - return dicts - - def extend_csv_file( csv_file: Path, new_dicts: list[dict[str, typing.Any]], @@ -180,7 +117,7 @@ def extend_csv_file( assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns try: - dicts = load_csv_file(csv_file) + dicts = csv_import.load_csv_file(csv_file) except (FileNotFoundError, _csv.Error) as e: logger.info('Creating file: %s', csv_file) dicts = [] @@ -199,7 +136,7 @@ def extend_csv_file( writer = csv.DictWriter( csvfile_in_memory, fieldnames=fieldnames, - dialect=CSV_DIALECT, + dialect=csv_import.CSV_DIALECT, ) writer.writeheader() for d in dicts: diff --git a/test/test_csv_import.py b/test/test_csv_import.py new file mode 100644 index 0000000..ef9da1b --- /dev/null +++ b/test/test_csv_import.py @@ -0,0 +1,16 @@ +from personal_data.csv_import import determine_possible_keys +import frozendict +import datetime + +def test_determine_possible_keys(): + data = frozendict.frozendict({'game.name': 'Halo', 'me.last_played_time': + datetime.datetime(2021, 6, 13, 19, 12, 21, + tzinfo=datetime.timezone.utc), + 'trophy.name': 'Test', 'trophy.desc': + 'Description'}) + keys = determine_possible_keys(data) + + assert keys.time_start == [] + assert keys.time_end == ['me.last_played_time'] + assert keys.duration == [] + assert len(keys.name) == 3 diff --git a/test/test_parsing.py b/test/test_parsing.py index 8a2e96f..454ec04 100644 --- a/test/test_parsing.py +++ b/test/test_parsing.py @@ -3,13 +3,21 @@ from decimal import Decimal import pytest -from personal_data.util import csv_str_to_value +from personal_data.csv_import import csv_str_to_value PARSE_MAPPINGS = [ ( '2024-04-28 21:35:40+00:00', datetime.datetime(2024, 4, 28, 21, 35, 40, tzinfo=datetime.UTC), ), + ( + '2024-07-06 19:30:11+02:00', + datetime.datetime(2024, 7, 6, 19, 30, 11, tzinfo=datetime.timezone(datetime.timedelta(seconds=7200))), + ), + ( + '2023-10-21 11:43:27+02:00', + datetime.datetime(2023, 10, 21, 11, 43, 27, tzinfo=datetime.timezone(datetime.timedelta(seconds=7200))), + ), ( '0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38', '0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38',