1
0

Compare commits

...

5 Commits

Author SHA1 Message Date
167b2c8f27
csv_safe_value
Some checks failed
Test Python / Test (push) Failing after 29s
2024-08-25 21:07:52 +02:00
36b372fb2d
Import fetchers 2024-08-25 20:56:47 +02:00
595640efdf
Ruff 2024-08-25 20:50:03 +02:00
aebf3c7df4
Include description 2024-08-25 20:49:49 +02:00
7c1d6003f4
Parse urls 2024-08-25 20:38:16 +02:00
7 changed files with 95 additions and 39 deletions

View File

@ -7,6 +7,10 @@ from personal_data.notification import NotificationType
def parse_arguments(): def parse_arguments():
available_scraper_names = personal_data.main.available_scraper_names() available_scraper_names = personal_data.main.available_scraper_names()
if len(available_scraper_names) == 0:
msg = 'Failed to load any scrapers'
raise Exception(msg)
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
epilog='Available fetchers: ' + ' '.join(available_scraper_names), epilog='Available fetchers: ' + ' '.join(available_scraper_names),
) )

View File

@ -71,7 +71,11 @@ def has_data_attribute(e) -> bool:
def normalize_soup_slightly( def normalize_soup_slightly(
soup, classes=True, scripts=True, comments=True, data_attributes=True, soup,
classes=True,
scripts=True,
comments=True,
data_attributes=True,
): ):
"""Perform soup normalization.""" """Perform soup normalization."""
# Little if any content # Little if any content

View File

@ -1,18 +1,12 @@
import csv
import datetime
import decimal
import inspect import inspect
import io
from pathlib import Path
import logging import logging
from collections.abc import Iterable, Mapping, Sequence from collections.abc import Sequence
from decimal import Decimal from pathlib import Path
import requests import requests
import requests_cache import requests_cache
from frozendict import frozendict
from . import notification, data from . import data, notification
from .util import * from .util import *
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -59,17 +53,18 @@ def get_session(
assert isinstance(with_cfscrape, bool) assert isinstance(with_cfscrape, bool)
session_class = requests_cache.CachedSession session_class = requests_cache.CachedSession
if ignore_cache: if ignore_cache:
logger.warn('HTTP cache disabled') logger.warning('HTTP cache disabled')
return requests.Session() return requests.Session()
if cfscrape: if cfscrape:
session_class = CachedCfScrape session_class = CachedCfScrape
session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar) session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=datetime.timedelta(days=1))
for cookie in cookiejar: for cookie in cookiejar:
session.cookies.set_cookie(cookie) session.cookies.set_cookie(cookie)
return session return session
def available_scrapers() -> list[type[data.Scraper]]: def available_scrapers() -> list[type[data.Scraper]]:
from . import fetchers # noqa
subclasses = [] subclasses = []
class_queue = [data.Scraper] class_queue = [data.Scraper]
while class_queue: while class_queue:

View File

@ -31,7 +31,8 @@ def parse_duration(text: str) -> datetime.timedelta:
def parse_response_datetime(response) -> datetime.datetime: def parse_response_datetime(response) -> datetime.datetime:
return datetime.datetime.strptime( return datetime.datetime.strptime(
response.headers['Date'], FORMAT_DATE_HEADER, response.headers['Date'],
FORMAT_DATE_HEADER,
).replace(tzinfo=datetime.UTC) ).replace(tzinfo=datetime.UTC)

View File

@ -1,41 +1,55 @@
import csv import csv
import datetime import datetime
import decimal import decimal
import inspect
import io import io
from pathlib import Path
import logging import logging
from collections.abc import Iterable, Mapping, Sequence import typing
import urllib.parse
from collections.abc import Callable, Iterable, Mapping, Sequence
from decimal import Decimal from decimal import Decimal
from pathlib import Path
import requests
import requests_cache
from frozendict import frozendict from frozendict import frozendict
from . import notification, data from . import data
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
CSV_DIALECT = 'one_true_dialect' CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True) csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
def try_value(fn, s: str) -> object: T = typing.TypeVar('T')
def try_value(fn: Callable[[str], T], s: str) -> T | None:
try: try:
return fn(s) return fn(s)
except (ValueError, decimal.InvalidOperation): except (ValueError, decimal.InvalidOperation):
return None return None
def to_value(s: str) -> object: def csv_str_to_value(
s: str,
) -> (
str
| Decimal
| datetime.date
| datetime.datetime
| urllib.parse.ParseResult
| bool
| None
):
s = s.strip() s = s.strip()
if len(s) == 0: if len(s) == 0:
return None return None
if (v := try_value(Decimal, s)) is not None: if (v_decimal := try_value(Decimal, s)) is not None:
return v return v_decimal
if v := try_value(datetime.date.fromisoformat, s): if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
return v return v_date
if v := try_value(datetime.datetime.fromisoformat, s): if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
return v return v_datetime
if s.startswith(('http://', 'https://')):
return urllib.parse.urlparse(s)
if s.lower() == 'false': if s.lower() == 'false':
return False return False
if s.lower() == 'true': if s.lower() == 'true':
@ -44,6 +58,11 @@ def to_value(s: str) -> object:
return None return None
return s return s
def csv_safe_value(v: object) -> str:
if isinstance(v, urllib.parse.ParseResult):
return v.geturl()
return str(v)
def equals_without_fields( def equals_without_fields(
a: Mapping[str, object], a: Mapping[str, object],
@ -116,7 +135,7 @@ def deduplicate_dicts(
def normalize_dict(d: dict) -> frozendict: def normalize_dict(d: dict) -> frozendict:
return frozendict( return frozendict(
{k: to_value(str(v)) for k, v in d.items() if to_value(str(v)) is not None}, {k: csv_str_to_value(str(v)) for k, v in d.items() if csv_str_to_value(str(v)) is not None},
) )
@ -127,7 +146,7 @@ def load_csv_file(csv_file: Path) -> list[frozendict]:
for row in reader: for row in reader:
for k in list(row.keys()): for k in list(row.keys()):
orig = row[k] orig = row[k]
row[k] = to_value(orig) row[k] = csv_str_to_value(orig)
if row[k] is None: if row[k] is None:
del row[k] del row[k]
del k, orig del k, orig
@ -136,6 +155,7 @@ def load_csv_file(csv_file: Path) -> list[frozendict]:
del csvfile del csvfile
return dicts return dicts
def extend_csv_file( def extend_csv_file(
csv_file: Path, csv_file: Path,
new_dicts: list[dict], new_dicts: list[dict],
@ -168,11 +188,13 @@ def extend_csv_file(
) )
writer.writeheader() writer.writeheader()
for d in dicts: for d in dicts:
writer.writerow(d) writable_d = {k:csv_safe_value(v) for k,v in d.items()}
writer.writerow(writable_d)
del d, writable_d
output_csv = csvfile_in_memory.getvalue() output_csv = csvfile_in_memory.getvalue()
del writer, csvfile_in_memory del writer, csvfile_in_memory
csv_file.parent.mkdir(parents=True,exist_ok=True) csv_file.parent.mkdir(parents=True, exist_ok=True)
with open(csv_file, 'w') as csvfile: with open(csv_file, 'w') as csvfile:
csvfile.write(output_csv) csvfile.write(output_csv)
del csvfile del csvfile

View File

@ -1,12 +1,13 @@
import argparse import argparse
import logging
import icalendar
import datetime import datetime
import csv import urllib.parse
import icalendar
from personal_data.util import load_csv_file from personal_data.util import load_csv_file
NOW = datetime.datetime.now(tz = datetime.UTC) NOW = datetime.datetime.now(tz=datetime.UTC)
def parse_arguments(): def parse_arguments():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -14,22 +15,43 @@ def parse_arguments():
parser.add_argument('output_file') parser.add_argument('output_file')
return parser.parse_args() return parser.parse_args()
def generate_calendar(rows: list[dict]) -> icalendar.Calendar: def generate_calendar(rows: list[dict]) -> icalendar.Calendar:
cal = icalendar.Calendar() cal = icalendar.Calendar()
cal.add('prodid', '-//personal_data_calendar//example.org//') cal.add('prodid', '-//personal_data_calendar//example.org//')
cal.add('version', '2.0') cal.add('version', '2.0')
for event_data in rows: for event_data in rows:
# Select data
print(event_data) print(event_data)
# Select data
possible_time_keys = [
k for k, v in event_data.items() if isinstance(v, datetime.date)
]
possible_name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
possible_image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
]
date = event_data[possible_time_keys[0]] if possible_time_keys else None
title = event_data[possible_name_keys[0]]
image = event_data[possible_image_keys[0]] if possible_image_keys else None
if date is None:
continue
description = '\n\n'.join(event_data[k] for k in possible_name_keys)
# Create event # Create event
event = icalendar.Event() event = icalendar.Event()
event.add('summary', f'Event {i}') event.add('summary', title)
event.add('dtstart', datetime.datetime(2005,4,4,8,0,0,tzinfo=datetime.UTC)) event.add('description', description)
event.add('dtend', datetime.datetime(2005,4,4,10,0,0,tzinfo=datetime.UTC)) event.add('dtstart', date)
event.add('dtend', date)
event.add('created', NOW) event.add('created', NOW)
event.add('dtstamp', NOW) event.add('dtstamp', NOW)
if image:
event.add('image', image.geturl())
cal.add_component(event) cal.add_component(event)
del event del event
@ -47,5 +69,6 @@ def main():
with open(args.output_file, 'wb') as f: with open(args.output_file, 'wb') as f:
f.write(calendar.to_ical()) f.write(calendar.to_ical())
if __name__ == '__main__': if __name__ == '__main__':
main() main()

7
test/test_main.py Normal file
View File

@ -0,0 +1,7 @@
import personal_data.main
def test_available():
names = personal_data.main.available_scraper_names()
assert len(names) > 0