1
0

Compare commits

..

No commits in common. "167b2c8f276f7c812c1ced07ff48ddee79f06770" and "8ca9c36a6e8194bfef3c1fd7cb488320632684c0" have entirely different histories.

7 changed files with 40 additions and 96 deletions

View File

@ -7,10 +7,6 @@ from personal_data.notification import NotificationType
def parse_arguments():
available_scraper_names = personal_data.main.available_scraper_names()
if len(available_scraper_names) == 0:
msg = 'Failed to load any scrapers'
raise Exception(msg)
parser = argparse.ArgumentParser(
epilog='Available fetchers: ' + ' '.join(available_scraper_names),
)

View File

@ -71,11 +71,7 @@ def has_data_attribute(e) -> bool:
def normalize_soup_slightly(
soup,
classes=True,
scripts=True,
comments=True,
data_attributes=True,
soup, classes=True, scripts=True, comments=True, data_attributes=True,
):
"""Perform soup normalization."""
# Little if any content

View File

@ -1,12 +1,18 @@
import csv
import datetime
import decimal
import inspect
import logging
from collections.abc import Sequence
import io
from pathlib import Path
import logging
from collections.abc import Iterable, Mapping, Sequence
from decimal import Decimal
import requests
import requests_cache
from frozendict import frozendict
from . import data, notification
from . import notification, data
from .util import *
logger = logging.getLogger(__name__)
@ -53,18 +59,17 @@ def get_session(
assert isinstance(with_cfscrape, bool)
session_class = requests_cache.CachedSession
if ignore_cache:
logger.warning('HTTP cache disabled')
logger.warn('HTTP cache disabled')
return requests.Session()
if cfscrape:
session_class = CachedCfScrape
session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=datetime.timedelta(days=1))
session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar)
for cookie in cookiejar:
session.cookies.set_cookie(cookie)
return session
def available_scrapers() -> list[type[data.Scraper]]:
from . import fetchers # noqa
subclasses = []
class_queue = [data.Scraper]
while class_queue:

View File

@ -31,8 +31,7 @@ def parse_duration(text: str) -> datetime.timedelta:
def parse_response_datetime(response) -> datetime.datetime:
return datetime.datetime.strptime(
response.headers['Date'],
FORMAT_DATE_HEADER,
response.headers['Date'], FORMAT_DATE_HEADER,
).replace(tzinfo=datetime.UTC)

View File

@ -1,55 +1,41 @@
import csv
import datetime
import decimal
import inspect
import io
import logging
import typing
import urllib.parse
from collections.abc import Callable, Iterable, Mapping, Sequence
from decimal import Decimal
from pathlib import Path
import logging
from collections.abc import Iterable, Mapping, Sequence
from decimal import Decimal
import requests
import requests_cache
from frozendict import frozendict
from . import data
from . import notification, data
logger = logging.getLogger(__name__)
CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
T = typing.TypeVar('T')
def try_value(fn: Callable[[str], T], s: str) -> T | None:
def try_value(fn, s: str) -> object:
try:
return fn(s)
except (ValueError, decimal.InvalidOperation):
return None
def csv_str_to_value(
s: str,
) -> (
str
| Decimal
| datetime.date
| datetime.datetime
| urllib.parse.ParseResult
| bool
| None
):
def to_value(s: str) -> object:
s = s.strip()
if len(s) == 0:
return None
if (v_decimal := try_value(Decimal, s)) is not None:
return v_decimal
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
return v_date
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
return v_datetime
if s.startswith(('http://', 'https://')):
return urllib.parse.urlparse(s)
if (v := try_value(Decimal, s)) is not None:
return v
if v := try_value(datetime.date.fromisoformat, s):
return v
if v := try_value(datetime.datetime.fromisoformat, s):
return v
if s.lower() == 'false':
return False
if s.lower() == 'true':
@ -58,11 +44,6 @@ def csv_str_to_value(
return None
return s
def csv_safe_value(v: object) -> str:
if isinstance(v, urllib.parse.ParseResult):
return v.geturl()
return str(v)
def equals_without_fields(
a: Mapping[str, object],
@ -135,7 +116,7 @@ def deduplicate_dicts(
def normalize_dict(d: dict) -> frozendict:
return frozendict(
{k: csv_str_to_value(str(v)) for k, v in d.items() if csv_str_to_value(str(v)) is not None},
{k: to_value(str(v)) for k, v in d.items() if to_value(str(v)) is not None},
)
@ -146,7 +127,7 @@ def load_csv_file(csv_file: Path) -> list[frozendict]:
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = csv_str_to_value(orig)
row[k] = to_value(orig)
if row[k] is None:
del row[k]
del k, orig
@ -155,7 +136,6 @@ def load_csv_file(csv_file: Path) -> list[frozendict]:
del csvfile
return dicts
def extend_csv_file(
csv_file: Path,
new_dicts: list[dict],
@ -188,9 +168,7 @@ def extend_csv_file(
)
writer.writeheader()
for d in dicts:
writable_d = {k:csv_safe_value(v) for k,v in d.items()}
writer.writerow(writable_d)
del d, writable_d
writer.writerow(d)
output_csv = csvfile_in_memory.getvalue()
del writer, csvfile_in_memory

View File

@ -1,57 +1,35 @@
import argparse
import datetime
import urllib.parse
import logging
import icalendar
import datetime
import csv
from personal_data.util import load_csv_file
NOW = datetime.datetime.now(tz = datetime.UTC)
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('data_folder')
parser.add_argument('output_file')
return parser.parse_args()
def generate_calendar(rows: list[dict]) -> icalendar.Calendar:
cal = icalendar.Calendar()
cal.add('prodid', '-//personal_data_calendar//example.org//')
cal.add('version', '2.0')
for event_data in rows:
print(event_data)
# Select data
possible_time_keys = [
k for k, v in event_data.items() if isinstance(v, datetime.date)
]
possible_name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
possible_image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
]
date = event_data[possible_time_keys[0]] if possible_time_keys else None
title = event_data[possible_name_keys[0]]
image = event_data[possible_image_keys[0]] if possible_image_keys else None
if date is None:
continue
description = '\n\n'.join(event_data[k] for k in possible_name_keys)
print(event_data)
# Create event
event = icalendar.Event()
event.add('summary', title)
event.add('description', description)
event.add('dtstart', date)
event.add('dtend', date)
event.add('summary', f'Event {i}')
event.add('dtstart', datetime.datetime(2005,4,4,8,0,0,tzinfo=datetime.UTC))
event.add('dtend', datetime.datetime(2005,4,4,10,0,0,tzinfo=datetime.UTC))
event.add('created', NOW)
event.add('dtstamp', NOW)
if image:
event.add('image', image.geturl())
cal.add_component(event)
del event
@ -69,6 +47,5 @@ def main():
with open(args.output_file, 'wb') as f:
f.write(calendar.to_ical())
if __name__ == '__main__':
main()

View File

@ -1,7 +0,0 @@
import personal_data.main
def test_available():
names = personal_data.main.available_scraper_names()
assert len(names) > 0