1
0
personal-data/personal_data/main.py
Jon Michael Aanes 1a9df24278
Some checks failed
Test Python / Test (push) Failing after 24s
Use paths
2024-07-27 02:14:01 +02:00

315 lines
8.5 KiB
Python

import csv
import datetime
import decimal
import inspect
import io
from pathlib import Path
import logging
from collections.abc import Iterable, Mapping, Sequence
from decimal import Decimal
import requests
import requests_cache
from frozendict import frozendict
logger = logging.getLogger(__name__)
try:
import cfscrape
except ImportError:
cfscrape = None
logger.warning('cfscrape not installed: Certain fetchers might not work')
try:
import browsercookie
except ImportError:
logger.warning('browsercookie not installed: Certain fetchers might not work')
browsercookie = None
import personal_data.data
import personal_data.fetchers
from . import notification
CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
OUTPUT_PATH = Path('./output')
logging.basicConfig()
logger.setLevel('INFO')
def try_value(fn, s: str) -> object:
try:
return fn(s)
except (ValueError, decimal.InvalidOperation):
return None
def to_value(s: str) -> object:
s = s.strip()
if len(s) == 0:
return None
if (v := try_value(Decimal, s)) is not None:
return v
if v := try_value(datetime.date.fromisoformat, s):
return v
if v := try_value(datetime.datetime.fromisoformat, s):
return v
if s.lower() == 'false':
return False
if s.lower() == 'true':
return True
if s.lower() == 'none':
return None
return s
def equals_without_fields(
a: Mapping[str, object],
b: Mapping[str, object],
fields: Iterable[str] = frozenset(),
) -> bool:
a = dict(a)
b = dict(b)
for f in fields:
del a[f], b[f]
return frozendict(a) == frozendict(b)
def deduplicate_by_ignoring_certain_fields(
dicts: list[dict],
deduplicate_ignore_columns: Iterable[str],
) -> list[dict]:
"""Removes duplicates that occur when ignoring certain columns.
Output order is stable.
"""
to_remove = set()
for idx1, first in enumerate(dicts):
for idx2, second in enumerate(dicts[idx1 + 1 :], idx1 + 1):
if equals_without_fields(first, second, deduplicate_ignore_columns):
to_remove.add(idx2)
to_remove = sorted(to_remove)
while to_remove:
del dicts[to_remove.pop()]
return dicts
def deduplicate_dicts(
dicts: Sequence[dict],
deduplicate_mode: personal_data.data.DeduplicateMode,
deduplicate_ignore_columns: list[str],
) -> tuple[Sequence[dict], list[str]]:
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
fieldnames = []
for d in dicts:
for k in d.keys():
if k not in fieldnames:
fieldnames.append(k)
del k
del d
if deduplicate_mode == personal_data.data.DeduplicateMode.ONLY_LATEST:
while len(dicts) >= 2 and equals_without_fields(
dicts[-1],
dicts[-2],
deduplicate_ignore_columns,
):
del dicts[-1]
elif deduplicate_mode == personal_data.data.DeduplicateMode.BY_ALL_COLUMNS:
dicts = deduplicate_by_ignoring_certain_fields(
dicts,
deduplicate_ignore_columns,
)
elif deduplicate_mode != personal_data.data.DeduplicateMode.NONE:
dicts = set(dicts)
dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames))
return dicts, fieldnames
def normalize_dict(d: dict) -> frozendict:
return frozendict(
{k: to_value(str(v)) for k, v in d.items() if to_value(str(v)) is not None},
)
def extend_csv_file(
csv_file: Path,
new_dicts: list[dict],
deduplicate_mode: personal_data.data.DeduplicateMode,
deduplicate_ignore_columns: list[str],
) -> dict:
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
dicts = []
try:
with open(csv_file) as csvfile:
reader = csv.DictReader(csvfile, dialect=CSV_DIALECT)
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = to_value(orig)
if row[k] is None:
del row[k]
del k, orig
dicts.append(frozendict(row))
del row
del csvfile
except FileNotFoundError as e:
logger.info('Creating file: %s', csv_file)
original_num_dicts = len(dicts)
dicts += [normalize_dict(d) for d in new_dicts]
del new_dicts
dicts, fieldnames = deduplicate_dicts(
dicts,
deduplicate_mode,
deduplicate_ignore_columns,
)
csvfile_in_memory = io.StringIO()
writer = csv.DictWriter(
csvfile_in_memory,
fieldnames=fieldnames,
dialect=CSV_DIALECT,
)
writer.writeheader()
for d in dicts:
writer.writerow(d)
output_csv = csvfile_in_memory.getvalue()
del writer, csvfile_in_memory
csv_file.parent.mkdir(parents=True,exist_ok=True)
with open(csv_file, 'w') as csvfile:
csvfile.write(output_csv)
del csvfile
logger.info(
'Extended CSV "%s" from %d to %d lines',
csv_file,
original_num_dicts,
len(dicts),
)
return {
'extended': original_num_dicts != len(dicts),
'input_lines': original_num_dicts,
'output_lines': len(dicts),
'dicts': dicts,
}
STANDARD_HEADERS = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0',
# "Accept": "application/json, text/plain, */*",
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
}
if cfscrape:
class CachedCfScrape(requests_cache.CacheMixin, cfscrape.CloudflareScraper):
pass
def get_session(
cookiejar: Sequence,
*,
with_cfscrape: bool,
ignore_cache: bool,
) -> requests.Session:
assert isinstance(with_cfscrape, bool)
session_class = requests_cache.CachedSession
if ignore_cache:
logger.warn('HTTP cache disabled')
return requests.Session()
if cfscrape:
session_class = CachedCfScrape
session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar)
for cookie in cookiejar:
session.cookies.set_cookie(cookie)
return session
def available_scrapers() -> list[type[personal_data.data.Scraper]]:
subclasses = []
class_queue = [personal_data.data.Scraper]
while class_queue:
clazz = class_queue.pop()
if inspect.isabstract(clazz):
class_queue.extend(clazz.__subclasses__())
else:
subclasses.append(clazz)
return subclasses
def available_scraper_names() -> list[str]:
return [scraper_cls.__name__ for scraper_cls in available_scrapers()]
def main(
scraper_filter: frozenset[str],
*,
use_cookiejar: bool,
ignore_cache: bool,
notification_types: frozenset[notification.NotificationType],
) -> None:
if use_cookiejar:
cookiejar = browsercookie.firefox()
logger.info('Got cookiejar from firefox: %s cookies', len(cookiejar))
else:
cookiejar = []
logger.warning('No cookiejar is used')
if len(notification_types) == 0:
logger.info('No notifications enabled: Notifications will not be sent!')
for scraper_cls in available_scrapers():
session = get_session(
cookiejar,
with_cfscrape=scraper_cls.requires_cfscrape(),
ignore_cache=ignore_cache,
)
scraper = scraper_cls(session)
if scraper_cls.__name__ not in scraper_filter:
continue
logger.info(
'Running %s, appending to "%s"',
scraper_cls.__name__,
scraper.dataset_name,
)
result_rows = []
try:
for result in scraper.scrape():
result_rows.append(result)
del result
except requests.exceptions.HTTPError:
logger.exception('Failed in running %s', scraper_cls.__name__)
continue
status = extend_csv_file(
OUTPUT_PATH / f'{scraper.dataset_name}.csv',
result_rows,
deduplicate_mode=scraper.deduplicate_mode,
deduplicate_ignore_columns=scraper.deduplicate_ignore_columns(),
)
logger.info('Scraper done: %s', scraper.dataset_name)
if status['extended']:
notification.send_notifications(
session,
scraper_cls.__name__,
status['dicts'][-1],
notification_types,
)
del scraper, session