1
0
personal-data/personal_data/util.py
2024-08-25 20:38:16 +02:00

195 lines
5.2 KiB
Python

import csv
import datetime
import decimal
import io
import urllib.parse
from pathlib import Path
import logging
from collections.abc import Iterable, Mapping, Sequence, Callable
from decimal import Decimal
import typing
from frozendict import frozendict
from . import data
logger = logging.getLogger(__name__)
CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
T = typing.TypeVar('T')
def try_value(fn: Callable[[str],T], s: str) -> T | None:
try:
return fn(s)
except (ValueError, decimal.InvalidOperation):
return None
def to_value(s: str) -> str | Decimal | datetime.date | datetime.datetime | urllib.parse.ParseResult | bool | None:
s = s.strip()
if len(s) == 0:
return None
if (v_decimal := try_value(Decimal, s)) is not None:
return v_decimal
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
return v_date
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
return v_datetime
if s.startswith(('http://', 'https://')):
return urllib.parse.urlparse(s)
if s.lower() == 'false':
return False
if s.lower() == 'true':
return True
if s.lower() == 'none':
return None
return s
def equals_without_fields(
a: Mapping[str, object],
b: Mapping[str, object],
fields: Iterable[str] = frozenset(),
) -> bool:
a = dict(a)
b = dict(b)
for f in fields:
del a[f], b[f]
return frozendict(a) == frozendict(b)
def deduplicate_by_ignoring_certain_fields(
dicts: list[dict],
deduplicate_ignore_columns: Iterable[str],
) -> list[dict]:
"""Removes duplicates that occur when ignoring certain columns.
Output order is stable.
"""
to_remove = set()
for idx1, first in enumerate(dicts):
for idx2, second in enumerate(dicts[idx1 + 1 :], idx1 + 1):
if equals_without_fields(first, second, deduplicate_ignore_columns):
to_remove.add(idx2)
to_remove = sorted(to_remove)
while to_remove:
del dicts[to_remove.pop()]
return dicts
def deduplicate_dicts(
dicts: Sequence[dict],
deduplicate_mode: data.DeduplicateMode,
deduplicate_ignore_columns: list[str],
) -> tuple[Sequence[dict], list[str]]:
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
fieldnames = []
for d in dicts:
for k in d.keys():
if k not in fieldnames:
fieldnames.append(k)
del k
del d
if deduplicate_mode == data.DeduplicateMode.ONLY_LATEST:
while len(dicts) >= 2 and equals_without_fields(
dicts[-1],
dicts[-2],
deduplicate_ignore_columns,
):
del dicts[-1]
elif deduplicate_mode == data.DeduplicateMode.BY_ALL_COLUMNS:
dicts = deduplicate_by_ignoring_certain_fields(
dicts,
deduplicate_ignore_columns,
)
elif deduplicate_mode != data.DeduplicateMode.NONE:
dicts = set(dicts)
dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames))
return dicts, fieldnames
def normalize_dict(d: dict) -> frozendict:
return frozendict(
{k: to_value(str(v)) for k, v in d.items() if to_value(str(v)) is not None},
)
def load_csv_file(csv_file: Path) -> list[frozendict]:
dicts: list[frozendict] = []
with open(csv_file) as csvfile:
reader = csv.DictReader(csvfile, dialect=CSV_DIALECT)
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = to_value(orig)
if row[k] is None:
del row[k]
del k, orig
dicts.append(frozendict(row))
del row
del csvfile
return dicts
def extend_csv_file(
csv_file: Path,
new_dicts: list[dict],
deduplicate_mode: data.DeduplicateMode,
deduplicate_ignore_columns: list[str],
) -> dict:
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
try:
dicts = load_csv_file(csv_file)
except FileNotFoundError as e:
logger.info('Creating file: %s', csv_file)
dicts = []
original_num_dicts = len(dicts)
dicts += [normalize_dict(d) for d in new_dicts]
del new_dicts
dicts, fieldnames = deduplicate_dicts(
dicts,
deduplicate_mode,
deduplicate_ignore_columns,
)
csvfile_in_memory = io.StringIO()
writer = csv.DictWriter(
csvfile_in_memory,
fieldnames=fieldnames,
dialect=CSV_DIALECT,
)
writer.writeheader()
for d in dicts:
writer.writerow(d)
output_csv = csvfile_in_memory.getvalue()
del writer, csvfile_in_memory
csv_file.parent.mkdir(parents=True,exist_ok=True)
with open(csv_file, 'w') as csvfile:
csvfile.write(output_csv)
del csvfile
logger.info(
'Extended CSV "%s" from %d to %d lines',
csv_file,
original_num_dicts,
len(dicts),
)
return {
'extended': original_num_dicts != len(dicts),
'input_lines': original_num_dicts,
'output_lines': len(dicts),
'dicts': dicts,
}