1
0

Compare commits

...

5 Commits

Author SHA1 Message Date
167b2c8f27
csv_safe_value
Some checks failed
Test Python / Test (push) Failing after 29s
2024-08-25 21:07:52 +02:00
36b372fb2d
Import fetchers 2024-08-25 20:56:47 +02:00
595640efdf
Ruff 2024-08-25 20:50:03 +02:00
aebf3c7df4
Include description 2024-08-25 20:49:49 +02:00
7c1d6003f4
Parse urls 2024-08-25 20:38:16 +02:00
7 changed files with 95 additions and 39 deletions

View File

@ -7,6 +7,10 @@ from personal_data.notification import NotificationType
def parse_arguments():
available_scraper_names = personal_data.main.available_scraper_names()
if len(available_scraper_names) == 0:
msg = 'Failed to load any scrapers'
raise Exception(msg)
parser = argparse.ArgumentParser(
epilog='Available fetchers: ' + ' '.join(available_scraper_names),
)

View File

@ -71,7 +71,11 @@ def has_data_attribute(e) -> bool:
def normalize_soup_slightly(
soup, classes=True, scripts=True, comments=True, data_attributes=True,
soup,
classes=True,
scripts=True,
comments=True,
data_attributes=True,
):
"""Perform soup normalization."""
# Little if any content

View File

@ -1,18 +1,12 @@
import csv
import datetime
import decimal
import inspect
import io
from pathlib import Path
import logging
from collections.abc import Iterable, Mapping, Sequence
from decimal import Decimal
from collections.abc import Sequence
from pathlib import Path
import requests
import requests_cache
from frozendict import frozendict
from . import notification, data
from . import data, notification
from .util import *
logger = logging.getLogger(__name__)
@ -59,17 +53,18 @@ def get_session(
assert isinstance(with_cfscrape, bool)
session_class = requests_cache.CachedSession
if ignore_cache:
logger.warn('HTTP cache disabled')
logger.warning('HTTP cache disabled')
return requests.Session()
if cfscrape:
session_class = CachedCfScrape
session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar)
session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=datetime.timedelta(days=1))
for cookie in cookiejar:
session.cookies.set_cookie(cookie)
return session
def available_scrapers() -> list[type[data.Scraper]]:
from . import fetchers # noqa
subclasses = []
class_queue = [data.Scraper]
while class_queue:

View File

@ -31,7 +31,8 @@ def parse_duration(text: str) -> datetime.timedelta:
def parse_response_datetime(response) -> datetime.datetime:
return datetime.datetime.strptime(
response.headers['Date'], FORMAT_DATE_HEADER,
response.headers['Date'],
FORMAT_DATE_HEADER,
).replace(tzinfo=datetime.UTC)

View File

@ -1,41 +1,55 @@
import csv
import datetime
import decimal
import inspect
import io
from pathlib import Path
import logging
from collections.abc import Iterable, Mapping, Sequence
import typing
import urllib.parse
from collections.abc import Callable, Iterable, Mapping, Sequence
from decimal import Decimal
from pathlib import Path
import requests
import requests_cache
from frozendict import frozendict
from . import notification, data
from . import data
logger = logging.getLogger(__name__)
CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
def try_value(fn, s: str) -> object:
T = typing.TypeVar('T')
def try_value(fn: Callable[[str], T], s: str) -> T | None:
try:
return fn(s)
except (ValueError, decimal.InvalidOperation):
return None
def to_value(s: str) -> object:
def csv_str_to_value(
s: str,
) -> (
str
| Decimal
| datetime.date
| datetime.datetime
| urllib.parse.ParseResult
| bool
| None
):
s = s.strip()
if len(s) == 0:
return None
if (v := try_value(Decimal, s)) is not None:
return v
if v := try_value(datetime.date.fromisoformat, s):
return v
if v := try_value(datetime.datetime.fromisoformat, s):
return v
if (v_decimal := try_value(Decimal, s)) is not None:
return v_decimal
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
return v_date
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
return v_datetime
if s.startswith(('http://', 'https://')):
return urllib.parse.urlparse(s)
if s.lower() == 'false':
return False
if s.lower() == 'true':
@ -44,6 +58,11 @@ def to_value(s: str) -> object:
return None
return s
def csv_safe_value(v: object) -> str:
if isinstance(v, urllib.parse.ParseResult):
return v.geturl()
return str(v)
def equals_without_fields(
a: Mapping[str, object],
@ -116,7 +135,7 @@ def deduplicate_dicts(
def normalize_dict(d: dict) -> frozendict:
return frozendict(
{k: to_value(str(v)) for k, v in d.items() if to_value(str(v)) is not None},
{k: csv_str_to_value(str(v)) for k, v in d.items() if csv_str_to_value(str(v)) is not None},
)
@ -127,7 +146,7 @@ def load_csv_file(csv_file: Path) -> list[frozendict]:
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = to_value(orig)
row[k] = csv_str_to_value(orig)
if row[k] is None:
del row[k]
del k, orig
@ -136,6 +155,7 @@ def load_csv_file(csv_file: Path) -> list[frozendict]:
del csvfile
return dicts
def extend_csv_file(
csv_file: Path,
new_dicts: list[dict],
@ -168,11 +188,13 @@ def extend_csv_file(
)
writer.writeheader()
for d in dicts:
writer.writerow(d)
writable_d = {k:csv_safe_value(v) for k,v in d.items()}
writer.writerow(writable_d)
del d, writable_d
output_csv = csvfile_in_memory.getvalue()
del writer, csvfile_in_memory
csv_file.parent.mkdir(parents=True,exist_ok=True)
csv_file.parent.mkdir(parents=True, exist_ok=True)
with open(csv_file, 'w') as csvfile:
csvfile.write(output_csv)
del csvfile

View File

@ -1,12 +1,13 @@
import argparse
import logging
import icalendar
import datetime
import csv
import urllib.parse
import icalendar
from personal_data.util import load_csv_file
NOW = datetime.datetime.now(tz = datetime.UTC)
NOW = datetime.datetime.now(tz=datetime.UTC)
def parse_arguments():
parser = argparse.ArgumentParser()
@ -14,22 +15,43 @@ def parse_arguments():
parser.add_argument('output_file')
return parser.parse_args()
def generate_calendar(rows: list[dict]) -> icalendar.Calendar:
cal = icalendar.Calendar()
cal.add('prodid', '-//personal_data_calendar//example.org//')
cal.add('version', '2.0')
for event_data in rows:
# Select data
print(event_data)
# Select data
possible_time_keys = [
k for k, v in event_data.items() if isinstance(v, datetime.date)
]
possible_name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
possible_image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
]
date = event_data[possible_time_keys[0]] if possible_time_keys else None
title = event_data[possible_name_keys[0]]
image = event_data[possible_image_keys[0]] if possible_image_keys else None
if date is None:
continue
description = '\n\n'.join(event_data[k] for k in possible_name_keys)
# Create event
event = icalendar.Event()
event.add('summary', f'Event {i}')
event.add('dtstart', datetime.datetime(2005,4,4,8,0,0,tzinfo=datetime.UTC))
event.add('dtend', datetime.datetime(2005,4,4,10,0,0,tzinfo=datetime.UTC))
event.add('summary', title)
event.add('description', description)
event.add('dtstart', date)
event.add('dtend', date)
event.add('created', NOW)
event.add('dtstamp', NOW)
if image:
event.add('image', image.geturl())
cal.add_component(event)
del event
@ -47,5 +69,6 @@ def main():
with open(args.output_file, 'wb') as f:
f.write(calendar.to_ical())
if __name__ == '__main__':
main()

7
test/test_main.py Normal file
View File

@ -0,0 +1,7 @@
import personal_data.main
def test_available():
names = personal_data.main.available_scraper_names()
assert len(names) > 0