1
0

Compare commits

..

No commits in common. "167b2c8f276f7c812c1ced07ff48ddee79f06770" and "8ca9c36a6e8194bfef3c1fd7cb488320632684c0" have entirely different histories.

7 changed files with 40 additions and 96 deletions

View File

@ -7,10 +7,6 @@ from personal_data.notification import NotificationType
def parse_arguments(): def parse_arguments():
available_scraper_names = personal_data.main.available_scraper_names() available_scraper_names = personal_data.main.available_scraper_names()
if len(available_scraper_names) == 0:
msg = 'Failed to load any scrapers'
raise Exception(msg)
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
epilog='Available fetchers: ' + ' '.join(available_scraper_names), epilog='Available fetchers: ' + ' '.join(available_scraper_names),
) )

View File

@ -71,11 +71,7 @@ def has_data_attribute(e) -> bool:
def normalize_soup_slightly( def normalize_soup_slightly(
soup, soup, classes=True, scripts=True, comments=True, data_attributes=True,
classes=True,
scripts=True,
comments=True,
data_attributes=True,
): ):
"""Perform soup normalization.""" """Perform soup normalization."""
# Little if any content # Little if any content

View File

@ -1,12 +1,18 @@
import csv
import datetime
import decimal
import inspect import inspect
import logging import io
from collections.abc import Sequence
from pathlib import Path from pathlib import Path
import logging
from collections.abc import Iterable, Mapping, Sequence
from decimal import Decimal
import requests import requests
import requests_cache import requests_cache
from frozendict import frozendict
from . import data, notification from . import notification, data
from .util import * from .util import *
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -53,18 +59,17 @@ def get_session(
assert isinstance(with_cfscrape, bool) assert isinstance(with_cfscrape, bool)
session_class = requests_cache.CachedSession session_class = requests_cache.CachedSession
if ignore_cache: if ignore_cache:
logger.warning('HTTP cache disabled') logger.warn('HTTP cache disabled')
return requests.Session() return requests.Session()
if cfscrape: if cfscrape:
session_class = CachedCfScrape session_class = CachedCfScrape
session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=datetime.timedelta(days=1)) session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar)
for cookie in cookiejar: for cookie in cookiejar:
session.cookies.set_cookie(cookie) session.cookies.set_cookie(cookie)
return session return session
def available_scrapers() -> list[type[data.Scraper]]: def available_scrapers() -> list[type[data.Scraper]]:
from . import fetchers # noqa
subclasses = [] subclasses = []
class_queue = [data.Scraper] class_queue = [data.Scraper]
while class_queue: while class_queue:

View File

@ -31,8 +31,7 @@ def parse_duration(text: str) -> datetime.timedelta:
def parse_response_datetime(response) -> datetime.datetime: def parse_response_datetime(response) -> datetime.datetime:
return datetime.datetime.strptime( return datetime.datetime.strptime(
response.headers['Date'], response.headers['Date'], FORMAT_DATE_HEADER,
FORMAT_DATE_HEADER,
).replace(tzinfo=datetime.UTC) ).replace(tzinfo=datetime.UTC)

View File

@ -1,55 +1,41 @@
import csv import csv
import datetime import datetime
import decimal import decimal
import inspect
import io import io
import logging
import typing
import urllib.parse
from collections.abc import Callable, Iterable, Mapping, Sequence
from decimal import Decimal
from pathlib import Path from pathlib import Path
import logging
from collections.abc import Iterable, Mapping, Sequence
from decimal import Decimal
import requests
import requests_cache
from frozendict import frozendict from frozendict import frozendict
from . import data from . import notification, data
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
CSV_DIALECT = 'one_true_dialect' CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True) csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
T = typing.TypeVar('T') def try_value(fn, s: str) -> object:
def try_value(fn: Callable[[str], T], s: str) -> T | None:
try: try:
return fn(s) return fn(s)
except (ValueError, decimal.InvalidOperation): except (ValueError, decimal.InvalidOperation):
return None return None
def csv_str_to_value( def to_value(s: str) -> object:
s: str,
) -> (
str
| Decimal
| datetime.date
| datetime.datetime
| urllib.parse.ParseResult
| bool
| None
):
s = s.strip() s = s.strip()
if len(s) == 0: if len(s) == 0:
return None return None
if (v_decimal := try_value(Decimal, s)) is not None: if (v := try_value(Decimal, s)) is not None:
return v_decimal return v
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None: if v := try_value(datetime.date.fromisoformat, s):
return v_date return v
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None: if v := try_value(datetime.datetime.fromisoformat, s):
return v_datetime return v
if s.startswith(('http://', 'https://')):
return urllib.parse.urlparse(s)
if s.lower() == 'false': if s.lower() == 'false':
return False return False
if s.lower() == 'true': if s.lower() == 'true':
@ -58,11 +44,6 @@ def csv_str_to_value(
return None return None
return s return s
def csv_safe_value(v: object) -> str:
if isinstance(v, urllib.parse.ParseResult):
return v.geturl()
return str(v)
def equals_without_fields( def equals_without_fields(
a: Mapping[str, object], a: Mapping[str, object],
@ -135,7 +116,7 @@ def deduplicate_dicts(
def normalize_dict(d: dict) -> frozendict: def normalize_dict(d: dict) -> frozendict:
return frozendict( return frozendict(
{k: csv_str_to_value(str(v)) for k, v in d.items() if csv_str_to_value(str(v)) is not None}, {k: to_value(str(v)) for k, v in d.items() if to_value(str(v)) is not None},
) )
@ -146,7 +127,7 @@ def load_csv_file(csv_file: Path) -> list[frozendict]:
for row in reader: for row in reader:
for k in list(row.keys()): for k in list(row.keys()):
orig = row[k] orig = row[k]
row[k] = csv_str_to_value(orig) row[k] = to_value(orig)
if row[k] is None: if row[k] is None:
del row[k] del row[k]
del k, orig del k, orig
@ -155,7 +136,6 @@ def load_csv_file(csv_file: Path) -> list[frozendict]:
del csvfile del csvfile
return dicts return dicts
def extend_csv_file( def extend_csv_file(
csv_file: Path, csv_file: Path,
new_dicts: list[dict], new_dicts: list[dict],
@ -188,9 +168,7 @@ def extend_csv_file(
) )
writer.writeheader() writer.writeheader()
for d in dicts: for d in dicts:
writable_d = {k:csv_safe_value(v) for k,v in d.items()} writer.writerow(d)
writer.writerow(writable_d)
del d, writable_d
output_csv = csvfile_in_memory.getvalue() output_csv = csvfile_in_memory.getvalue()
del writer, csvfile_in_memory del writer, csvfile_in_memory

View File

@ -1,57 +1,35 @@
import argparse import argparse
import datetime import logging
import urllib.parse
import icalendar import icalendar
import datetime
import csv
from personal_data.util import load_csv_file from personal_data.util import load_csv_file
NOW = datetime.datetime.now(tz = datetime.UTC) NOW = datetime.datetime.now(tz = datetime.UTC)
def parse_arguments(): def parse_arguments():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('data_folder') parser.add_argument('data_folder')
parser.add_argument('output_file') parser.add_argument('output_file')
return parser.parse_args() return parser.parse_args()
def generate_calendar(rows: list[dict]) -> icalendar.Calendar: def generate_calendar(rows: list[dict]) -> icalendar.Calendar:
cal = icalendar.Calendar() cal = icalendar.Calendar()
cal.add('prodid', '-//personal_data_calendar//example.org//') cal.add('prodid', '-//personal_data_calendar//example.org//')
cal.add('version', '2.0') cal.add('version', '2.0')
for event_data in rows: for event_data in rows:
print(event_data)
# Select data # Select data
possible_time_keys = [ print(event_data)
k for k, v in event_data.items() if isinstance(v, datetime.date)
]
possible_name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
possible_image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
]
date = event_data[possible_time_keys[0]] if possible_time_keys else None
title = event_data[possible_name_keys[0]]
image = event_data[possible_image_keys[0]] if possible_image_keys else None
if date is None:
continue
description = '\n\n'.join(event_data[k] for k in possible_name_keys)
# Create event # Create event
event = icalendar.Event() event = icalendar.Event()
event.add('summary', title) event.add('summary', f'Event {i}')
event.add('description', description) event.add('dtstart', datetime.datetime(2005,4,4,8,0,0,tzinfo=datetime.UTC))
event.add('dtstart', date) event.add('dtend', datetime.datetime(2005,4,4,10,0,0,tzinfo=datetime.UTC))
event.add('dtend', date)
event.add('created', NOW) event.add('created', NOW)
event.add('dtstamp', NOW) event.add('dtstamp', NOW)
if image:
event.add('image', image.geturl())
cal.add_component(event) cal.add_component(event)
del event del event
@ -69,6 +47,5 @@ def main():
with open(args.output_file, 'wb') as f: with open(args.output_file, 'wb') as f:
f.write(calendar.to_ical()) f.write(calendar.to_ical())
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -1,7 +0,0 @@
import personal_data.main
def test_available():
names = personal_data.main.available_scraper_names()
assert len(names) > 0