1
0
personal-data/personal_data/main.py
Jon Michael Aanes 02763e72b6
Some checks failed
Test Python / Test (push) Has been cancelled
Explicit ignore cache
2024-06-03 00:00:01 +02:00

279 lines
7.7 KiB
Python

import csv
import datetime
import decimal
import inspect
import io
import logging
from collections.abc import Iterable, Mapping, Sequence
from decimal import Decimal
import requests
import requests_cache
from frozendict import frozendict
logger = logging.getLogger(__name__)
try:
import cfscrape
except ImportError:
cfscrape = None
logger.warning('cfscrape not installed: Certain fetchers might not work')
try:
import browsercookie
except ImportError:
logger.warning('browsercookie not installed: Certain fetchers might not work')
browsercookie = None
import personal_data.data
import personal_data.fetchers
from . import notification
CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
logging.basicConfig()
logger.setLevel('INFO')
def try_value(fn, s: str) -> object:
try:
return fn(s)
except (ValueError, decimal.InvalidOperation):
return None
def to_value(s: str) -> object:
s = s.strip()
if len(s) == 0:
return None
if (v := try_value(Decimal, s)) is not None:
return v
if v := try_value(datetime.date.fromisoformat, s):
return v
if v := try_value(datetime.datetime.fromisoformat, s):
return v
if s.lower() == 'false':
return False
if s.lower() == 'true':
return True
if s.lower() == 'none':
return None
return s
def equals_without_fields(
a: Mapping[str, object],
b: Mapping[str, object],
fields: Iterable[str] = frozenset(),
) -> bool:
a = dict(a)
b = dict(b)
for f in fields:
del a[f], b[f]
return frozendict(a) == frozendict(b)
def deduplicate_dicts(
dicts: Sequence[dict],
deduplicate_mode: personal_data.data.DeduplicateMode,
deduplicate_ignore_columns: list[str],
) -> tuple[Sequence[dict], list[str]]:
fieldnames = []
for d in dicts:
for k in d.keys():
if k not in fieldnames:
fieldnames.append(k)
del k
del d
if deduplicate_mode == personal_data.data.DeduplicateMode.ONLY_LATEST:
while len(dicts) >= 2 and equals_without_fields(
dicts[-1],
dicts[-2],
deduplicate_ignore_columns,
):
del dicts[-1]
elif deduplicate_mode == personal_data.data.DeduplicateMode.BY_ALL_COLUMNS:
to_remove = set()
for idx1, first in enumerate(dicts):
for second in dicts[idx1 + 1 :]:
if equals_without_fields(first, second, deduplicate_ignore_columns):
to_remove.add(second)
dicts = set(dicts) - to_remove
elif deduplicate_mode != personal_data.data.DeduplicateMode.NONE:
dicts = set(dicts)
dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames))
return dicts, fieldnames
def normalize_dict(d: dict) -> frozendict:
return frozendict(
{k: to_value(str(v)) for k, v in d.items() if to_value(str(v)) is not None},
)
def extend_csv_file(
filename: str,
new_dicts: list[dict],
deduplicate_mode: personal_data.data.DeduplicateMode,
deduplicate_ignore_columns: list[str],
) -> dict:
dicts = []
try:
with open(filename) as csvfile:
reader = csv.DictReader(csvfile, dialect=CSV_DIALECT)
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = to_value(orig)
if row[k] is None:
del row[k]
del k, orig
dicts.append(frozendict(row))
del row
del csvfile
except FileNotFoundError as e:
logger.info('Creating file: %s', filename)
original_num_dicts = len(dicts)
dicts += [normalize_dict(d) for d in new_dicts]
del new_dicts
dicts, fieldnames = deduplicate_dicts(
dicts,
deduplicate_mode,
deduplicate_ignore_columns,
)
csvfile_in_memory = io.StringIO()
writer = csv.DictWriter(
csvfile_in_memory,
fieldnames=fieldnames,
dialect=CSV_DIALECT,
)
writer.writeheader()
for d in dicts:
writer.writerow(d)
output_csv = csvfile_in_memory.getvalue()
del writer, csvfile_in_memory
with open(filename, 'w') as csvfile:
csvfile.write(output_csv)
del csvfile
logger.info(
'Extended CSV "%s" from %d to %d lines',
filename,
original_num_dicts,
len(dicts),
)
return {
'extended': original_num_dicts != len(dicts),
'input_lines': original_num_dicts,
'output_lines': len(dicts),
'dicts': dicts,
}
STANDARD_HEADERS = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0',
# "Accept": "application/json, text/plain, */*",
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
}
if cfscrape:
class CachedCfScrape(requests_cache.CacheMixin, cfscrape.CloudflareScraper):
pass
def get_session(cookiejar, *, with_cfscrape: bool, ignore_cache: bool) -> requests.Session:
assert isinstance(with_cfscrape, bool)
session_class = requests_cache.CachedSession
if ignore_cache:
session_class = requests.Session
logger.warn('HTTP cache disabled')
if cfscrape:
session_class = CachedCfScrape
session = session_class('output/web_cache', cookies=cookiejar)
for cookie in cookiejar:
session.cookies.set_cookie(cookie)
return session
def available_scrapers() -> list[type[personal_data.data.Scraper]]:
subclasses = []
class_queue = [personal_data.data.Scraper]
while class_queue:
clazz = class_queue.pop()
if inspect.isabstract(clazz):
class_queue.extend(clazz.__subclasses__())
else:
subclasses.append(clazz)
return subclasses
def available_scraper_names() -> list[str]:
return [scraper_cls.__name__ for scraper_cls in available_scrapers()]
def main(
scraper_filter: frozenset[str],
*,
use_cookiejar: bool,
ignore_cache: bool,
notification_types: frozenset[notification.NotificationType],
) -> None:
if use_cookiejar:
cookiejar = browsercookie.firefox()
logger.info('Got cookiejar from firefox: %s cookies', len(cookiejar))
else:
cookiejar = []
logger.warning('No cookiejar is used')
if len(notification_types) == 0:
logger.info('No notifications enabled: Notifications will not be sent!')
for scraper_cls in available_scrapers():
session = get_session(cookiejar,
with_cfscrape=scraper_cls.requires_cfscrape(),
ignore_cache = ignore_cache)
scraper = scraper_cls(session)
if scraper_cls.__name__ not in scraper_filter:
continue
logger.info(
'Running %s, appending to "%s"',
scraper_cls.__name__,
scraper.dataset_name,
)
result_rows = []
try:
for result in scraper.scrape():
result_rows.append(result)
del result
except requests.exceptions.HTTPError:
logger.exception('Failed in running %s', scraper_cls.__name__)
continue
status = extend_csv_file(
f'output/{scraper.dataset_name}.csv',
result_rows,
deduplicate_mode=scraper.deduplicate_mode,
deduplicate_ignore_columns=scraper.deduplicate_ignore_columns,
)
logger.info('Scraper done: %s', scraper.dataset_name)
if status['extended']:
notification.send_notifications(
session, scraper_cls.__name__, status['dicts'][-1], notification_types,
)
del scraper, session