1
0

Code quality

This commit is contained in:
Jon Michael Aanes 2024-10-25 22:24:33 +02:00
parent 96a2e2bed9
commit 6b6591d96a
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
7 changed files with 33 additions and 28 deletions

View File

@ -1,4 +1,5 @@
import re
from collections.abc import Iterator
import bs4
@ -18,7 +19,7 @@ HTML_TAGS_WITH_LITTLE_CONTENT: set[str] = {
} | HTML_TAGS_MOSTLY_CONTENTLESS
def normalize_text(text: str) -> str:
def normalize_text(text: str) -> bytes:
text = text.replace('\t', ' ')
text = text.replace('\r', '')
text = re.sub(r'\s*\n\s*\n\s*', '\n\n', text)
@ -28,7 +29,7 @@ def normalize_text(text: str) -> str:
return text.encode('utf-8')
def normalize_soup_bs4(soup: bs4.BeautifulSoup) -> bytes:
def normalize_soup_bs4(soup: bs4.BeautifulSoup) -> bs4.BeautifulSoup:
for comment in soup(text=lambda text: isinstance(text, bs4.Comment)):
comment.extract()
del comment
@ -40,7 +41,7 @@ def normalize_soup_bs4(soup: bs4.BeautifulSoup) -> bytes:
return soup
def normalize_soup_lxml(soup) -> bytes:
def normalize_soup_lxml(soup):
for element_name in HTML_TAGS_WITH_LITTLE_CONTENT:
for script_elements in soup.cssselect(element_name):
script_elements.drop_tree()
@ -50,7 +51,6 @@ def normalize_soup_lxml(soup) -> bytes:
def normalize_soup(soup) -> bytes:
text = None
if isinstance(soup, bs4.BeautifulSoup):
text = normalize_soup_bs4(soup).get_text()
else:
@ -58,25 +58,25 @@ def normalize_soup(soup) -> bytes:
return normalize_text(text)
def data_attributes_of_element(e):
def data_attributes_of_element(e) -> Iterator[str]:
for attr_key in list(e.attrs.keys()):
if attr_key.startswith('data-'):
yield attr_key
def has_data_attribute(e) -> bool:
for attr_key in data_attributes_of_element(e):
for _ in data_attributes_of_element(e):
return True
return False
def normalize_soup_slightly(
soup,
soup: bs4.BeautifulSoup,
classes=True,
scripts=True,
comments=True,
data_attributes=True,
):
) -> bs4.BeautifulSoup:
"""Perform soup normalization."""
# Little if any content
for tag in HTML_TAGS_MOSTLY_CONTENTLESS:

View File

@ -13,10 +13,12 @@ FROM_MAIL_USERNAME = 'scrapers'
def send_email(session: requests.Session, subject: str, text: str):
assert isinstance(session, requests.Session)
assert subject != ''
assert text != ''
if subject == '':
msg = 'Subject must not be empty'
raise ValueError(msg)
if text == '':
msg = 'Text must not be empty'
raise ValueError(msg)
logger.info('Sending email using mailgun!')

View File

@ -1,3 +1,4 @@
import datetime
import inspect
import logging
from collections.abc import Sequence
@ -6,8 +7,7 @@ from pathlib import Path
import requests
import requests_cache
from . import data, notification
from .util import *
from . import data, notification, util
logger = logging.getLogger(__name__)
@ -32,7 +32,6 @@ logger.setLevel('INFO')
STANDARD_HEADERS = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0',
# "Accept": "application/json, text/plain, */*",
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
}
@ -53,8 +52,7 @@ def get_session(
with_cfscrape: bool,
ignore_cache: bool,
) -> requests.Session:
assert isinstance(with_cfscrape, bool)
if cfscrape:
if with_cfscrape and cfscrape:
session_class = CachedCfScrape
if ignore_cache:
logger.warning('HTTP cache disabled')
@ -75,8 +73,6 @@ def get_session(
def available_scrapers() -> list[type[data.Scraper]]:
from . import fetchers # noqa
subclasses = []
class_queue = [data.Scraper]
while class_queue:
@ -131,7 +127,7 @@ def main(
except requests.exceptions.HTTPError:
logger.exception('Failed in running %s', scraper_cls.__name__)
continue
status = extend_csv_file(
status = util.extend_csv_file(
OUTPUT_PATH / f'{scraper.dataset_name}.csv',
result_rows,
deduplicate_mode=scraper.deduplicate_mode,

View File

@ -9,7 +9,6 @@ from . import mailgun
logger = logging.getLogger(__name__)
SOUND_PATH = 'resource/sound/57808__guitarguy1985__carterattack.mp3'
# SOUND_PATH = 'resource/sound/516855__matrixxx__wake-up-01.wav'
class NotificationType(enum.Enum):
@ -29,9 +28,9 @@ def send_email_notification(
def play_loud_and_annoying_sound(
session: requests.Session,
scraper_name: str,
latest_dict: frozendict,
_session: requests.Session,
_scraper_name: str,
_latest_dict: frozendict,
) -> None:
import playsound3

View File

@ -40,10 +40,10 @@ LOCAL_TIMEZONE = NOW.astimezone().tzinfo
def try_parse(text: str, fmt: str) -> datetime.datetime | None:
try:
time = datetime.datetime.strptime(text, fmt)
time = datetime.datetime.strptime(text, fmt) # noqa: DTZ007
if time.tzinfo is None:
time = time.replace(tzinfo=LOCAL_TIMEZONE)
except:
except ValueError:
time = None
return time

View File

@ -72,7 +72,7 @@ def deduplicate_dicts(
fieldnames = []
for d in dicts:
for k in d.keys():
for k in d:
if k not in fieldnames:
fieldnames.append(k)
del k

View File

@ -1,6 +1,14 @@
from personal_data.parse_util import parse_time
import datetime
from personal_data.parse_util import parse_date, parse_time
def test_parse_tme():
assert parse_time('06 Apr 2024 06:11:42 PM')
assert parse_time('26 Mar 2024 7:07:01 PM')
def test_parse_date():
assert parse_date('6 April 2024') == datetime.date(2024, 4, 6)
assert parse_date('April 6, 2024') == datetime.date(2024, 4, 6)
assert parse_date('Apr 6, 2024') == datetime.date(2024, 4, 6)