Compare commits
5 Commits
8ca9c36a6e
...
167b2c8f27
Author | SHA1 | Date | |
---|---|---|---|
167b2c8f27 | |||
36b372fb2d | |||
595640efdf | |||
aebf3c7df4 | |||
7c1d6003f4 |
|
@ -7,6 +7,10 @@ from personal_data.notification import NotificationType
|
|||
|
||||
def parse_arguments():
|
||||
available_scraper_names = personal_data.main.available_scraper_names()
|
||||
if len(available_scraper_names) == 0:
|
||||
msg = 'Failed to load any scrapers'
|
||||
raise Exception(msg)
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
epilog='Available fetchers: ' + ' '.join(available_scraper_names),
|
||||
)
|
||||
|
|
|
@ -71,7 +71,11 @@ def has_data_attribute(e) -> bool:
|
|||
|
||||
|
||||
def normalize_soup_slightly(
|
||||
soup, classes=True, scripts=True, comments=True, data_attributes=True,
|
||||
soup,
|
||||
classes=True,
|
||||
scripts=True,
|
||||
comments=True,
|
||||
data_attributes=True,
|
||||
):
|
||||
"""Perform soup normalization."""
|
||||
# Little if any content
|
||||
|
|
|
@ -1,18 +1,12 @@
|
|||
import csv
|
||||
import datetime
|
||||
import decimal
|
||||
import inspect
|
||||
import io
|
||||
from pathlib import Path
|
||||
import logging
|
||||
from collections.abc import Iterable, Mapping, Sequence
|
||||
from decimal import Decimal
|
||||
from collections.abc import Sequence
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
import requests_cache
|
||||
from frozendict import frozendict
|
||||
|
||||
from . import notification, data
|
||||
from . import data, notification
|
||||
from .util import *
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -59,17 +53,18 @@ def get_session(
|
|||
assert isinstance(with_cfscrape, bool)
|
||||
session_class = requests_cache.CachedSession
|
||||
if ignore_cache:
|
||||
logger.warn('HTTP cache disabled')
|
||||
logger.warning('HTTP cache disabled')
|
||||
return requests.Session()
|
||||
if cfscrape:
|
||||
session_class = CachedCfScrape
|
||||
session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar)
|
||||
session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=datetime.timedelta(days=1))
|
||||
for cookie in cookiejar:
|
||||
session.cookies.set_cookie(cookie)
|
||||
return session
|
||||
|
||||
|
||||
def available_scrapers() -> list[type[data.Scraper]]:
|
||||
from . import fetchers # noqa
|
||||
subclasses = []
|
||||
class_queue = [data.Scraper]
|
||||
while class_queue:
|
||||
|
|
|
@ -31,7 +31,8 @@ def parse_duration(text: str) -> datetime.timedelta:
|
|||
|
||||
def parse_response_datetime(response) -> datetime.datetime:
|
||||
return datetime.datetime.strptime(
|
||||
response.headers['Date'], FORMAT_DATE_HEADER,
|
||||
response.headers['Date'],
|
||||
FORMAT_DATE_HEADER,
|
||||
).replace(tzinfo=datetime.UTC)
|
||||
|
||||
|
||||
|
|
|
@ -1,41 +1,55 @@
|
|||
import csv
|
||||
import datetime
|
||||
import decimal
|
||||
import inspect
|
||||
import io
|
||||
from pathlib import Path
|
||||
import logging
|
||||
from collections.abc import Iterable, Mapping, Sequence
|
||||
import typing
|
||||
import urllib.parse
|
||||
from collections.abc import Callable, Iterable, Mapping, Sequence
|
||||
from decimal import Decimal
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
import requests_cache
|
||||
from frozendict import frozendict
|
||||
|
||||
from . import notification, data
|
||||
from . import data
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CSV_DIALECT = 'one_true_dialect'
|
||||
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
|
||||
|
||||
def try_value(fn, s: str) -> object:
|
||||
T = typing.TypeVar('T')
|
||||
|
||||
|
||||
def try_value(fn: Callable[[str], T], s: str) -> T | None:
|
||||
try:
|
||||
return fn(s)
|
||||
except (ValueError, decimal.InvalidOperation):
|
||||
return None
|
||||
|
||||
|
||||
def to_value(s: str) -> object:
|
||||
def csv_str_to_value(
|
||||
s: str,
|
||||
) -> (
|
||||
str
|
||||
| Decimal
|
||||
| datetime.date
|
||||
| datetime.datetime
|
||||
| urllib.parse.ParseResult
|
||||
| bool
|
||||
| None
|
||||
):
|
||||
s = s.strip()
|
||||
if len(s) == 0:
|
||||
return None
|
||||
if (v := try_value(Decimal, s)) is not None:
|
||||
return v
|
||||
if v := try_value(datetime.date.fromisoformat, s):
|
||||
return v
|
||||
if v := try_value(datetime.datetime.fromisoformat, s):
|
||||
return v
|
||||
if (v_decimal := try_value(Decimal, s)) is not None:
|
||||
return v_decimal
|
||||
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
|
||||
return v_date
|
||||
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
|
||||
return v_datetime
|
||||
if s.startswith(('http://', 'https://')):
|
||||
return urllib.parse.urlparse(s)
|
||||
if s.lower() == 'false':
|
||||
return False
|
||||
if s.lower() == 'true':
|
||||
|
@ -44,6 +58,11 @@ def to_value(s: str) -> object:
|
|||
return None
|
||||
return s
|
||||
|
||||
def csv_safe_value(v: object) -> str:
|
||||
if isinstance(v, urllib.parse.ParseResult):
|
||||
return v.geturl()
|
||||
return str(v)
|
||||
|
||||
|
||||
def equals_without_fields(
|
||||
a: Mapping[str, object],
|
||||
|
@ -116,7 +135,7 @@ def deduplicate_dicts(
|
|||
|
||||
def normalize_dict(d: dict) -> frozendict:
|
||||
return frozendict(
|
||||
{k: to_value(str(v)) for k, v in d.items() if to_value(str(v)) is not None},
|
||||
{k: csv_str_to_value(str(v)) for k, v in d.items() if csv_str_to_value(str(v)) is not None},
|
||||
)
|
||||
|
||||
|
||||
|
@ -127,7 +146,7 @@ def load_csv_file(csv_file: Path) -> list[frozendict]:
|
|||
for row in reader:
|
||||
for k in list(row.keys()):
|
||||
orig = row[k]
|
||||
row[k] = to_value(orig)
|
||||
row[k] = csv_str_to_value(orig)
|
||||
if row[k] is None:
|
||||
del row[k]
|
||||
del k, orig
|
||||
|
@ -136,6 +155,7 @@ def load_csv_file(csv_file: Path) -> list[frozendict]:
|
|||
del csvfile
|
||||
return dicts
|
||||
|
||||
|
||||
def extend_csv_file(
|
||||
csv_file: Path,
|
||||
new_dicts: list[dict],
|
||||
|
@ -168,7 +188,9 @@ def extend_csv_file(
|
|||
)
|
||||
writer.writeheader()
|
||||
for d in dicts:
|
||||
writer.writerow(d)
|
||||
writable_d = {k:csv_safe_value(v) for k,v in d.items()}
|
||||
writer.writerow(writable_d)
|
||||
del d, writable_d
|
||||
output_csv = csvfile_in_memory.getvalue()
|
||||
del writer, csvfile_in_memory
|
||||
|
||||
|
|
|
@ -1,35 +1,57 @@
|
|||
import argparse
|
||||
import logging
|
||||
import icalendar
|
||||
import datetime
|
||||
import csv
|
||||
import urllib.parse
|
||||
|
||||
import icalendar
|
||||
|
||||
from personal_data.util import load_csv_file
|
||||
|
||||
NOW = datetime.datetime.now(tz=datetime.UTC)
|
||||
|
||||
|
||||
def parse_arguments():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('data_folder')
|
||||
parser.add_argument('output_file')
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def generate_calendar(rows: list[dict]) -> icalendar.Calendar:
|
||||
cal = icalendar.Calendar()
|
||||
cal.add('prodid', '-//personal_data_calendar//example.org//')
|
||||
cal.add('version', '2.0')
|
||||
|
||||
for event_data in rows:
|
||||
# Select data
|
||||
print(event_data)
|
||||
|
||||
# Select data
|
||||
possible_time_keys = [
|
||||
k for k, v in event_data.items() if isinstance(v, datetime.date)
|
||||
]
|
||||
possible_name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
|
||||
possible_image_keys = [
|
||||
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
|
||||
]
|
||||
|
||||
date = event_data[possible_time_keys[0]] if possible_time_keys else None
|
||||
title = event_data[possible_name_keys[0]]
|
||||
image = event_data[possible_image_keys[0]] if possible_image_keys else None
|
||||
|
||||
if date is None:
|
||||
continue
|
||||
|
||||
description = '\n\n'.join(event_data[k] for k in possible_name_keys)
|
||||
|
||||
# Create event
|
||||
event = icalendar.Event()
|
||||
event.add('summary', f'Event {i}')
|
||||
event.add('dtstart', datetime.datetime(2005,4,4,8,0,0,tzinfo=datetime.UTC))
|
||||
event.add('dtend', datetime.datetime(2005,4,4,10,0,0,tzinfo=datetime.UTC))
|
||||
event.add('summary', title)
|
||||
event.add('description', description)
|
||||
event.add('dtstart', date)
|
||||
event.add('dtend', date)
|
||||
event.add('created', NOW)
|
||||
event.add('dtstamp', NOW)
|
||||
if image:
|
||||
event.add('image', image.geturl())
|
||||
cal.add_component(event)
|
||||
del event
|
||||
|
||||
|
@ -47,5 +69,6 @@ def main():
|
|||
with open(args.output_file, 'wb') as f:
|
||||
f.write(calendar.to_ical())
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
7
test/test_main.py
Normal file
7
test/test_main.py
Normal file
|
@ -0,0 +1,7 @@
|
|||
import personal_data.main
|
||||
|
||||
|
||||
def test_available():
|
||||
names = personal_data.main.available_scraper_names()
|
||||
assert len(names) > 0
|
||||
|
Loading…
Reference in New Issue
Block a user