From 3f0ab40982459ffc44903b56ff78d269c9b8a5b4 Mon Sep 17 00:00:00 2001 From: Jon Michael Aanes Date: Thu, 3 Oct 2024 23:24:12 +0200 Subject: [PATCH] Ruff --- obsidian_import/__init__.py | 14 +++-- obsidian_import/__main__.py | 3 +- obsidian_import/obsidian.py | 42 +++++++++----- personal_data/fetchers/defi_kucoin.py | 1 + personal_data/fetchers/ffxiv_lodestone.py | 7 ++- .../fetchers/jellyfin_watch_history.py | 55 +++++++++++-------- personal_data/fetchers/psnprofiles.py | 5 +- personal_data/fetchers/steam_community.py | 22 +++++--- personal_data/fetchers/tavex.py | 5 +- personal_data/main.py | 9 ++- personal_data/parse_util.py | 2 +- personal_data/util.py | 16 ++++-- personal_data_calendar/__main__.py | 5 +- test/test_main.py | 1 - 14 files changed, 116 insertions(+), 71 deletions(-) diff --git a/obsidian_import/__init__.py b/obsidian_import/__init__.py index e4ba2ee..3aa01ce 100644 --- a/obsidian_import/__init__.py +++ b/obsidian_import/__init__.py @@ -3,15 +3,19 @@ Sub-module for importing time-based data into Obsidian. """ -from pathlib import Path -from .obsidian import ObsidianVault -from personal_data.util import load_csv_file import datetime from logging import getLogger +from pathlib import Path + +from personal_data.util import load_csv_file + +from .obsidian import ObsidianVault + logger = getLogger(__name__) -def import_data(obsidian_path: Path, dry_run = True): - vault = ObsidianVault(obsidian_path, read_only = dry_run and 'silent' or None) + +def import_data(obsidian_path: Path, dry_run=True): + vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None) data_path = Path('/home/jmaa/Notes/workout.csv') rows = load_csv_file(data_path) diff --git a/obsidian_import/__main__.py b/obsidian_import/__main__.py index 81405a2..ea8d552 100644 --- a/obsidian_import/__main__.py +++ b/obsidian_import/__main__.py @@ -6,6 +6,7 @@ from . import import_data logger = logging.getLogger(__name__) + def parse_arguments(): parser = argparse.ArgumentParser() parser.add_argument('--vault', type=Path, required=True) @@ -21,7 +22,7 @@ def main(): args = parse_arguments() if args.dry_run: logger.warning('Dry run') - import_data(args.vault, dry_run = args.dry_run) + import_data(args.vault, dry_run=args.dry_run) if args.dry_run: logger.warning('Dry run: Use --yes to execute') diff --git a/obsidian_import/obsidian.py b/obsidian_import/obsidian.py index 0e804e8..a65d2fb 100644 --- a/obsidian_import/obsidian.py +++ b/obsidian_import/obsidian.py @@ -1,31 +1,33 @@ - import datetime -from typing import Any import json -from pathlib import Path - -import frontmatter from decimal import Decimal from logging import getLogger +from pathlib import Path +from typing import Any + +import frontmatter + logger = getLogger(__name__) StatisticKey = str -class ObsidianVault: - def __init__(self, vault_path : Path, read_only: bool = 'silent'): - self.vault_path = vault_path +class ObsidianVault: + def __init__(self, vault_path: Path, read_only: bool = 'silent'): + self.vault_path = vault_path assert (self.vault_path / '.obsidian').exists(), 'Not an Obsidian Vault' - with open(self.vault_path / '.obsidian' / 'daily-notes.json') as f: + with open(self.vault_path / '.obsidian' / 'daily-notes.json') as f: daily_notes_config = json.load(f) self.daily_folder = daily_notes_config['folder'] self.path_format = daily_notes_config['format'] self.template_file_path = daily_notes_config['template'] self.read_only = read_only - def get_statistic(self, date: datetime.date, statistic_key: StatisticKey) -> Any | None: + def get_statistic( + self, date: datetime.date, statistic_key: StatisticKey, + ) -> Any | None: try: with open(self._date_file_path(date)) as f: data = frontmatter.load(f) @@ -34,9 +36,15 @@ class ObsidianVault: return data.metadata.get(statistic_key) - def add_statistic(self, date: datetime.date, statistic_key: StatisticKey, amount: Any) -> bool: + def add_statistic( + self, date: datetime.date, statistic_key: StatisticKey, amount: Any, + ) -> bool: if self.read_only == 'silent': - logger.info('Real only ObsidianVault ignoring add_statistic(%s, "%s", ?)', date, statistic_key) + logger.info( + 'Real only ObsidianVault ignoring add_statistic(%s, "%s", ?)', + date, + statistic_key, + ) return False self._create_date_if_not_present(date) @@ -59,7 +67,9 @@ class ObsidianVault: def add_event(self, date: datetime.date, verb: str, subject: str) -> None: if self.read_only == 'silent': - logger.info('Real only ObsidianVault ignoring add_event(%s, "%s", ?)', date, verb) + logger.info( + 'Real only ObsidianVault ignoring add_event(%s, "%s", ?)', date, verb, + ) return self._create_date_if_not_present(date) @@ -76,7 +86,11 @@ class ObsidianVault: f.write(template_text) def _date_file_path(self, date: datetime.date): - path = self.path_format.replace('YYYY', str(date.year)).replace('MM', '{:02d}'.format(date.month)).replace('DD', '{:02d}'.format(date.day)) + path = ( + self.path_format.replace('YYYY', str(date.year)) + .replace('MM', f'{date.month:02d}') + .replace('DD', f'{date.day:02d}') + ) return (self.vault_path / self.daily_folder / path).with_suffix('.md') def _daily_template_path(self): diff --git a/personal_data/fetchers/defi_kucoin.py b/personal_data/fetchers/defi_kucoin.py index 5bcfc84..5aba22c 100644 --- a/personal_data/fetchers/defi_kucoin.py +++ b/personal_data/fetchers/defi_kucoin.py @@ -12,6 +12,7 @@ from .. import secrets logger = logging.getLogger(__name__) + def get_client(): assert secrets.KUCOIN_KEY, 'Missing secret: KUCOIN_KEY' assert secrets.KUCOIN_SECRET, 'Missing secret: KUCOIN_SECRET' diff --git a/personal_data/fetchers/ffxiv_lodestone.py b/personal_data/fetchers/ffxiv_lodestone.py index 5128ada..a585115 100644 --- a/personal_data/fetchers/ffxiv_lodestone.py +++ b/personal_data/fetchers/ffxiv_lodestone.py @@ -2,10 +2,11 @@ import dataclasses import datetime import logging import re + import bs4 +from .. import html_util, parse_util, secrets from ..data import DeduplicateMode, Scraper -from .. import secrets, parse_util, html_util logger = logging.getLogger(__name__) @@ -51,7 +52,9 @@ class LodestoneAchievementScraper(Scraper): time_acquired, ).group(1) time_acquired = int(time_acquired) - time_acquired = datetime.datetime.fromtimestamp(time_acquired,tz=datetime.UTC) + time_acquired = datetime.datetime.fromtimestamp( + time_acquired, tz=datetime.UTC, + ) trophy_desc = ( entry.select_one('.entry__activity__txt').get_text().strip() ) diff --git a/personal_data/fetchers/jellyfin_watch_history.py b/personal_data/fetchers/jellyfin_watch_history.py index 7b5e4b1..f2e75fe 100644 --- a/personal_data/fetchers/jellyfin_watch_history.py +++ b/personal_data/fetchers/jellyfin_watch_history.py @@ -1,42 +1,47 @@ import dataclasses -import datetime import logging -import re -import bs4 -from typing import Any from collections.abc import Iterator +from typing import Any + from jellyfin_apiclient_python import JellyfinClient +from .. import _version, secrets from ..data import DeduplicateMode, Scraper -from .. import secrets, parse_util, html_util, _version logger = logging.getLogger(__name__) URL_SITE_ROOT = 'https://steamcommunity.com/' -URL_GAME_ACHIVEMENTS = URL_SITE_ROOT+'id/{username}/stats/appid/{appid}' +URL_GAME_ACHIVEMENTS = URL_SITE_ROOT + 'id/{username}/stats/appid/{appid}' FORMAT_DATE_HEADER = '%d/%m/%YYYY' + def iterate_series(client): - result = client.jellyfin.user_items(params = { - 'includeItemTypes': 'Series', - 'parentId': 'a656b907eb3a73532e40e44b968d0225', - 'userId': 'dd95c1085c1b4e83ba8e8853fbc644ab', - }) + result = client.jellyfin.user_items( + params={ + 'includeItemTypes': 'Series', + 'parentId': 'a656b907eb3a73532e40e44b968d0225', + 'userId': 'dd95c1085c1b4e83ba8e8853fbc644ab', + }, + ) yield from result['Items'] + def iterate_watched_episodes_of_series(client, series_id: str): - result = client.jellyfin.user_items(params = { - 'filters': 'IsPlayed', - 'recursive': True, - 'includeItemTypes': 'Episode', - 'parentId': series_id, - 'userId': 'dd95c1085c1b4e83ba8e8853fbc644ab', - 'fields': 'AirTime', - }) + result = client.jellyfin.user_items( + params={ + 'filters': 'IsPlayed', + 'recursive': True, + 'includeItemTypes': 'Episode', + 'parentId': series_id, + 'userId': 'dd95c1085c1b4e83ba8e8853fbc644ab', + 'fields': 'AirTime', + }, + ) yield from result['Items'] + @dataclasses.dataclass(frozen=True) class JellyfinWatchHistoryScraper(Scraper): dataset_name = 'show_episodes_watched' @@ -45,12 +50,15 @@ class JellyfinWatchHistoryScraper(Scraper): def scrape(self) -> Iterator[dict[str, Any]]: client = JellyfinClient() - client.config.app('personal_data', _version.__version__, - 'test_machine', 'unique_id_1') + client.config.app( + 'personal_data', _version.__version__, 'test_machine', 'unique_id_1', + ) - client.config.data["auth.ssl"] = False + client.config.data['auth.ssl'] = False client.auth.connect_to_address(secrets.JELLYFIN_URL) - client.auth.login(secrets.JELLYFIN_URL, secrets.JELLYFIN_USERNAME, secrets.JELLYFIN_PASSWORD) + client.auth.login( + secrets.JELLYFIN_URL, secrets.JELLYFIN_USERNAME, secrets.JELLYFIN_PASSWORD, + ) for series_data in iterate_series(client): series_id = series_data['Id'] @@ -70,4 +78,3 @@ class JellyfinWatchHistoryScraper(Scraper): del episode_data del series_data, series_id - diff --git a/personal_data/fetchers/psnprofiles.py b/personal_data/fetchers/psnprofiles.py index cabfa77..42d6fc4 100644 --- a/personal_data/fetchers/psnprofiles.py +++ b/personal_data/fetchers/psnprofiles.py @@ -4,13 +4,13 @@ import re from collections.abc import Iterator import bs4 +import requests_util import personal_data.html_util from personal_data import secrets from personal_data.data import DeduplicateMode, Scraper from .. import parse_util -import requests_util logger = logging.getLogger(__name__) @@ -27,6 +27,7 @@ def game_psnprofiles_id_from_url(relative_url: str) -> int: MAX_NUMBER_GAMES_TO_PARSE = 1000 + @dataclasses.dataclass(frozen=True) class PsnProfilesScraper(Scraper): dataset_name = 'games_played_playstation' @@ -57,7 +58,7 @@ class PsnProfilesScraper(Scraper): requests_util.setup_limiter( self.session, URL_API_ROOT, - per_minute = 5, + per_minute=5, ) def _scrape_games_overview(self) -> Iterator[dict]: diff --git a/personal_data/fetchers/steam_community.py b/personal_data/fetchers/steam_community.py index a975e33..8369acc 100644 --- a/personal_data/fetchers/steam_community.py +++ b/personal_data/fetchers/steam_community.py @@ -2,18 +2,19 @@ import dataclasses import datetime import logging import re -import bs4 -from typing import Any from collections.abc import Iterator +from typing import Any +import bs4 + +from .. import html_util, parse_util, secrets from ..data import DeduplicateMode, Scraper -from .. import secrets, parse_util, html_util logger = logging.getLogger(__name__) URL_SITE_ROOT = 'https://steamcommunity.com/' -URL_GAME_ACHIVEMENTS = URL_SITE_ROOT+'id/{username}/stats/appid/{appid}' +URL_GAME_ACHIVEMENTS = URL_SITE_ROOT + 'id/{username}/stats/appid/{appid}' FORMAT_DATE_HEADER = '%d/%m/%YYYY' @@ -28,8 +29,8 @@ class SteamAchievementScraper(Scraper): def scrape_app(self, appid: int) -> Iterator[dict[str, Any]]: url = URL_GAME_ACHIVEMENTS.format( - username=secrets.STEAM_USERNAME, - appid=appid, + username=secrets.STEAM_USERNAME, + appid=appid, ) response = self.session.get(url) response.raise_for_status() @@ -39,14 +40,15 @@ class SteamAchievementScraper(Scraper): # Parse data soup = bs4.BeautifulSoup(response.content, 'lxml') - game_name: str = re.match(r'Steam Community :: (.+) :: Jmaa', soup.head.title.get_text()).group(1) + game_name: str = re.match( + r'Steam Community :: (.+) :: Jmaa', soup.head.title.get_text(), + ).group(1) soup = html_util.normalize_soup_slightly( soup, classes=False, ) - for entry in soup.select('.achieveRow'): trophy_name: str = entry.select_one('h3').get_text() trophy_desc: str = entry.select_one('h5').get_text() @@ -55,7 +57,9 @@ class SteamAchievementScraper(Scraper): time_acquired_html: str = entry.select_one('.achieveUnlockTime') if time_acquired_html is None: continue - time_acquired_text: str = time_acquired_html.get_text().strip().removeprefix('Unlocked ') + time_acquired_text: str = ( + time_acquired_html.get_text().strip().removeprefix('Unlocked ') + ) time_acquired: datetime.datetime = parse_util.parse_time(time_acquired_text) yield { diff --git a/personal_data/fetchers/tavex.py b/personal_data/fetchers/tavex.py index 3b5ea50..001d401 100644 --- a/personal_data/fetchers/tavex.py +++ b/personal_data/fetchers/tavex.py @@ -8,14 +8,15 @@ import dataclasses from decimal import Decimal import bs4 +import requests_util import personal_data.html_util import personal_data.parse_util from personal_data.data import DeduplicateMode, Scraper -import requests_util URL_API_ROOT = 'https://tavex.dk/' + def parse_dkk_price(dkk: str) -> Decimal: if dkk.strip() == '-': return None @@ -38,7 +39,7 @@ class TavexScraperBase(Scraper): requests_util.setup_limiter( self.session, URL_API_ROOT, - per_minute = 5, + per_minute=5, ) def scrape(self): diff --git a/personal_data/main.py b/personal_data/main.py index 3cd2110..172ec8b 100644 --- a/personal_data/main.py +++ b/personal_data/main.py @@ -43,8 +43,10 @@ if cfscrape: class CachedCfScrape(requests_cache.CacheMixin, cfscrape.CloudflareScraper): pass + CACHE_EXPIRE_DEFAULT = datetime.timedelta(days=7) + def get_session( cookiejar: Sequence, *, @@ -58,14 +60,17 @@ def get_session( return requests.Session() if cfscrape: session_class = CachedCfScrape - session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=CACHE_EXPIRE_DEFAULT) + session = session_class( + OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=CACHE_EXPIRE_DEFAULT, + ) for cookie in cookiejar: session.cookies.set_cookie(cookie) return session def available_scrapers() -> list[type[data.Scraper]]: - from . import fetchers # noqa + from . import fetchers # noqa + subclasses = [] class_queue = [data.Scraper] while class_queue: diff --git a/personal_data/parse_util.py b/personal_data/parse_util.py index 4f42bc2..5e68379 100644 --- a/personal_data/parse_util.py +++ b/personal_data/parse_util.py @@ -39,7 +39,7 @@ def parse_response_datetime(response) -> datetime.datetime: LOCAL_TIMEZONE = datetime.datetime.now(datetime.UTC).astimezone().tzinfo -def try_parse(text:str, fmt:str) -> datetime.datetime | None: +def try_parse(text: str, fmt: str) -> datetime.datetime | None: try: time = datetime.datetime.strptime(text, fmt) if time.tzinfo is None: diff --git a/personal_data/util.py b/personal_data/util.py index e43ae63..5107527 100644 --- a/personal_data/util.py +++ b/personal_data/util.py @@ -108,10 +108,10 @@ def deduplicate_by_ignoring_certain_fields( def deduplicate_dicts( - dicts: Sequence[dict[str,typing.Any] | frozendict[str,typing.Any]], + dicts: Sequence[dict[str, typing.Any] | frozendict[str, typing.Any]], deduplicate_mode: data.DeduplicateMode, deduplicate_ignore_columns: list[str], -) -> tuple[Sequence[dict[str,typing.Any]], list[str]]: +) -> tuple[Sequence[dict[str, typing.Any]], list[str]]: assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns fieldnames = [] @@ -141,9 +141,13 @@ def deduplicate_dicts( return dicts, fieldnames -def normalize_dict(d: dict[str,typing.Any]) -> frozendict[str,typing.Any]: +def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]: return frozendict( - {k: csv_str_to_value(str(v)) for k, v in d.items() if csv_str_to_value(str(v)) is not None}, + { + k: csv_str_to_value(str(v)) + for k, v in d.items() + if csv_str_to_value(str(v)) is not None + }, ) @@ -168,7 +172,7 @@ def load_csv_file(csv_file: Path) -> list[frozendict]: def extend_csv_file( csv_file: Path, - new_dicts: list[dict[str,typing.Any]], + new_dicts: list[dict[str, typing.Any]], deduplicate_mode: data.DeduplicateMode, deduplicate_ignore_columns: list[str], ) -> dict: @@ -198,7 +202,7 @@ def extend_csv_file( ) writer.writeheader() for d in dicts: - writable_d = {k:csv_safe_value(v) for k,v in d.items()} + writable_d = {k: csv_safe_value(v) for k, v in d.items()} writer.writerow(writable_d) del d, writable_d output_csv = csvfile_in_memory.getvalue() diff --git a/personal_data_calendar/__main__.py b/personal_data_calendar/__main__.py index a2b6a64..118eb9e 100644 --- a/personal_data_calendar/__main__.py +++ b/personal_data_calendar/__main__.py @@ -24,7 +24,6 @@ def generate_calendar(rows: list[dict]) -> icalendar.Calendar: cal.add('version', '2.0') for event_data in rows: - # Select data possible_time_keys = [ k for k, v in event_data.items() if isinstance(v, datetime.date) @@ -41,7 +40,9 @@ def generate_calendar(rows: list[dict]) -> icalendar.Calendar: continue title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts]) - description = '\n\n'.join(event_data[k] for k in possible_name_keys[max_title_parts:]) + description = '\n\n'.join( + event_data[k] for k in possible_name_keys[max_title_parts:] + ) # Create event event = icalendar.Event() diff --git a/test/test_main.py b/test/test_main.py index d78e64b..79f80cc 100644 --- a/test/test_main.py +++ b/test/test_main.py @@ -4,4 +4,3 @@ import personal_data.main def test_available(): names = personal_data.main.available_scraper_names() assert len(names) > 0 -