1
0
personal-data/personal_data/util.py
Jon Michael Aanes 6749479f38
All checks were successful
Run Python tests (through Pytest) / Test (push) Successful in 34s
Verify Python project can be installed, loaded and have version checked / Test (push) Successful in 30s
Support dataclasses obj
2025-02-01 20:33:54 +01:00

164 lines
4.6 KiB
Python

import _csv
import csv
import dataclasses
import datetime
import io
import logging
import urllib.parse
from collections.abc import Iterable, Mapping
from pathlib import Path
from typing import Any
from frozendict import frozendict
from . import csv_import, data
logger = logging.getLogger(__name__)
def equals_without_fields(
a: Mapping[str, Any],
b: Mapping[str, Any],
fields: Iterable[str] = frozenset(),
) -> bool:
a = dict(a)
b = dict(b)
for f in fields:
del a[f], b[f]
return frozendict(a) == frozendict(b)
def deduplicate_by_ignoring_certain_fields(
dicts: list[frozendict[str, Any]],
deduplicate_ignore_columns: Iterable[str],
) -> list[frozendict[str, Any]]:
"""Removes duplicates that occur when ignoring certain columns.
Output order is stable.
"""
to_remove = set()
for idx1, first in enumerate(dicts):
for idx2, second in enumerate(dicts[idx1 + 1 :], idx1 + 1):
if equals_without_fields(first, second, deduplicate_ignore_columns):
to_remove.add(idx2)
del idx2, second
del idx1, first
to_remove_ls = sorted(to_remove)
del to_remove
while to_remove_ls:
del dicts[to_remove_ls.pop()]
return dicts
def deduplicate_dicts(
dicts: list[frozendict[str, Any]],
deduplicate_mode: data.DeduplicateMode,
deduplicate_ignore_columns: list[str],
) -> tuple[list[frozendict[str, Any]], list[str]]:
if not isinstance(deduplicate_ignore_columns, list):
raise TypeError(deduplicate_ignore_columns)
fieldnames = []
for d in dicts:
for k in d:
if k not in fieldnames:
fieldnames.append(k)
del k
del d
if deduplicate_mode == data.DeduplicateMode.ONLY_LATEST:
while len(dicts) > 1 and equals_without_fields(
dicts[-1],
dicts[-2],
deduplicate_ignore_columns,
):
del dicts[-1]
elif deduplicate_mode == data.DeduplicateMode.BY_ALL_COLUMNS:
dicts = deduplicate_by_ignoring_certain_fields(
dicts,
deduplicate_ignore_columns,
)
elif deduplicate_mode != data.DeduplicateMode.NONE:
dicts = list(set(dicts))
dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames))
return dicts, fieldnames
def dataclass_to_dict(obj) -> dict[str, Any]:
d = dataclasses.asdict(obj)
return {k.replace('_','.',1):v for k,v in d.items()}
def normalize_dict(d: dict[str, Any] | frozendict[str, Any]) -> frozendict[str, Any]:
if not isinstance(d, dict) and not isinstance(d, frozendict):
d = dataclass_to_dict(d)
safe_values = [(k, csv_import.csv_str_to_value(csv_import.csv_safe_value(v))) for k, v in d.items() ]
return frozendict( {k:v for k,v in safe_values if v is not None})
def extend_csv_file(
csv_file: Path,
new_dicts: list[dict[str, Any] | frozendict[str, Any]],
deduplicate_mode: data.DeduplicateMode,
deduplicate_ignore_columns: list[str],
) -> dict:
if deduplicate_ignore_columns == data.Scraper.deduplicate_ignore_columns:
deduplicate_ignore_columns = []
if not isinstance(deduplicate_ignore_columns, list):
raise TypeError(deduplicate_ignore_columns)
try:
original_dicts = csv_import.load_csv_file(csv_file)
except (FileNotFoundError, _csv.Error):
logger.info('Creating file: %s', csv_file)
original_dicts = []
original_num_dicts = len(original_dicts)
dicts = [normalize_dict(d) for d in original_dicts] + [
normalize_dict(d) for d in new_dicts
]
del new_dicts
dicts, fieldnames = deduplicate_dicts(
dicts,
deduplicate_mode,
deduplicate_ignore_columns,
)
csvfile_in_memory = io.StringIO()
writer = csv.DictWriter(
csvfile_in_memory,
fieldnames=fieldnames,
dialect=csv_import.CSV_DIALECT,
)
writer.writeheader()
for d in dicts:
writable_d = {k: csv_import.csv_safe_value(v) for k, v in d.items()}
writer.writerow(writable_d)
del d, writable_d
output_csv = csvfile_in_memory.getvalue()
del writer, csvfile_in_memory
csv_file.parent.mkdir(parents=True, exist_ok=True)
with open(csv_file, 'w') as csvfile:
csvfile.write(output_csv)
del csvfile
logger.info(
'Extended CSV "%s" from %d to %d lines',
csv_file,
original_num_dicts,
len(dicts),
)
return {
'extended': original_num_dicts != len(dicts),
'input_lines': original_num_dicts,
'output_lines': len(dicts),
'dicts': dicts,
}