1
0
This commit is contained in:
Jon Michael Aanes 2024-10-03 23:24:12 +02:00
parent 207b6cec67
commit 3f0ab40982
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
14 changed files with 116 additions and 71 deletions

View File

@ -3,15 +3,19 @@
Sub-module for importing time-based data into Obsidian. Sub-module for importing time-based data into Obsidian.
""" """
from pathlib import Path
from .obsidian import ObsidianVault
from personal_data.util import load_csv_file
import datetime import datetime
from logging import getLogger from logging import getLogger
from pathlib import Path
from personal_data.util import load_csv_file
from .obsidian import ObsidianVault
logger = getLogger(__name__) logger = getLogger(__name__)
def import_data(obsidian_path: Path, dry_run = True):
vault = ObsidianVault(obsidian_path, read_only = dry_run and 'silent' or None) def import_data(obsidian_path: Path, dry_run=True):
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
data_path = Path('/home/jmaa/Notes/workout.csv') data_path = Path('/home/jmaa/Notes/workout.csv')
rows = load_csv_file(data_path) rows = load_csv_file(data_path)

View File

@ -6,6 +6,7 @@ from . import import_data
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def parse_arguments(): def parse_arguments():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--vault', type=Path, required=True) parser.add_argument('--vault', type=Path, required=True)
@ -21,7 +22,7 @@ def main():
args = parse_arguments() args = parse_arguments()
if args.dry_run: if args.dry_run:
logger.warning('Dry run') logger.warning('Dry run')
import_data(args.vault, dry_run = args.dry_run) import_data(args.vault, dry_run=args.dry_run)
if args.dry_run: if args.dry_run:
logger.warning('Dry run: Use --yes to execute') logger.warning('Dry run: Use --yes to execute')

View File

@ -1,19 +1,19 @@
import datetime import datetime
from typing import Any
import json import json
from pathlib import Path
import frontmatter
from decimal import Decimal from decimal import Decimal
from logging import getLogger from logging import getLogger
from pathlib import Path
from typing import Any
import frontmatter
logger = getLogger(__name__) logger = getLogger(__name__)
StatisticKey = str StatisticKey = str
class ObsidianVault:
def __init__(self, vault_path : Path, read_only: bool = 'silent'): class ObsidianVault:
def __init__(self, vault_path: Path, read_only: bool = 'silent'):
self.vault_path = vault_path self.vault_path = vault_path
assert (self.vault_path / '.obsidian').exists(), 'Not an Obsidian Vault' assert (self.vault_path / '.obsidian').exists(), 'Not an Obsidian Vault'
@ -25,7 +25,9 @@ class ObsidianVault:
self.template_file_path = daily_notes_config['template'] self.template_file_path = daily_notes_config['template']
self.read_only = read_only self.read_only = read_only
def get_statistic(self, date: datetime.date, statistic_key: StatisticKey) -> Any | None: def get_statistic(
self, date: datetime.date, statistic_key: StatisticKey,
) -> Any | None:
try: try:
with open(self._date_file_path(date)) as f: with open(self._date_file_path(date)) as f:
data = frontmatter.load(f) data = frontmatter.load(f)
@ -34,9 +36,15 @@ class ObsidianVault:
return data.metadata.get(statistic_key) return data.metadata.get(statistic_key)
def add_statistic(self, date: datetime.date, statistic_key: StatisticKey, amount: Any) -> bool: def add_statistic(
self, date: datetime.date, statistic_key: StatisticKey, amount: Any,
) -> bool:
if self.read_only == 'silent': if self.read_only == 'silent':
logger.info('Real only ObsidianVault ignoring add_statistic(%s, "%s", ?)', date, statistic_key) logger.info(
'Real only ObsidianVault ignoring add_statistic(%s, "%s", ?)',
date,
statistic_key,
)
return False return False
self._create_date_if_not_present(date) self._create_date_if_not_present(date)
@ -59,7 +67,9 @@ class ObsidianVault:
def add_event(self, date: datetime.date, verb: str, subject: str) -> None: def add_event(self, date: datetime.date, verb: str, subject: str) -> None:
if self.read_only == 'silent': if self.read_only == 'silent':
logger.info('Real only ObsidianVault ignoring add_event(%s, "%s", ?)', date, verb) logger.info(
'Real only ObsidianVault ignoring add_event(%s, "%s", ?)', date, verb,
)
return return
self._create_date_if_not_present(date) self._create_date_if_not_present(date)
@ -76,7 +86,11 @@ class ObsidianVault:
f.write(template_text) f.write(template_text)
def _date_file_path(self, date: datetime.date): def _date_file_path(self, date: datetime.date):
path = self.path_format.replace('YYYY', str(date.year)).replace('MM', '{:02d}'.format(date.month)).replace('DD', '{:02d}'.format(date.day)) path = (
self.path_format.replace('YYYY', str(date.year))
.replace('MM', f'{date.month:02d}')
.replace('DD', f'{date.day:02d}')
)
return (self.vault_path / self.daily_folder / path).with_suffix('.md') return (self.vault_path / self.daily_folder / path).with_suffix('.md')
def _daily_template_path(self): def _daily_template_path(self):

View File

@ -12,6 +12,7 @@ from .. import secrets
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_client(): def get_client():
assert secrets.KUCOIN_KEY, 'Missing secret: KUCOIN_KEY' assert secrets.KUCOIN_KEY, 'Missing secret: KUCOIN_KEY'
assert secrets.KUCOIN_SECRET, 'Missing secret: KUCOIN_SECRET' assert secrets.KUCOIN_SECRET, 'Missing secret: KUCOIN_SECRET'

View File

@ -2,10 +2,11 @@ import dataclasses
import datetime import datetime
import logging import logging
import re import re
import bs4 import bs4
from .. import html_util, parse_util, secrets
from ..data import DeduplicateMode, Scraper from ..data import DeduplicateMode, Scraper
from .. import secrets, parse_util, html_util
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -51,7 +52,9 @@ class LodestoneAchievementScraper(Scraper):
time_acquired, time_acquired,
).group(1) ).group(1)
time_acquired = int(time_acquired) time_acquired = int(time_acquired)
time_acquired = datetime.datetime.fromtimestamp(time_acquired,tz=datetime.UTC) time_acquired = datetime.datetime.fromtimestamp(
time_acquired, tz=datetime.UTC,
)
trophy_desc = ( trophy_desc = (
entry.select_one('.entry__activity__txt').get_text().strip() entry.select_one('.entry__activity__txt').get_text().strip()
) )

View File

@ -1,42 +1,47 @@
import dataclasses import dataclasses
import datetime
import logging import logging
import re
import bs4
from typing import Any
from collections.abc import Iterator from collections.abc import Iterator
from typing import Any
from jellyfin_apiclient_python import JellyfinClient from jellyfin_apiclient_python import JellyfinClient
from .. import _version, secrets
from ..data import DeduplicateMode, Scraper from ..data import DeduplicateMode, Scraper
from .. import secrets, parse_util, html_util, _version
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
URL_SITE_ROOT = 'https://steamcommunity.com/' URL_SITE_ROOT = 'https://steamcommunity.com/'
URL_GAME_ACHIVEMENTS = URL_SITE_ROOT+'id/{username}/stats/appid/{appid}' URL_GAME_ACHIVEMENTS = URL_SITE_ROOT + 'id/{username}/stats/appid/{appid}'
FORMAT_DATE_HEADER = '%d/%m/%YYYY' FORMAT_DATE_HEADER = '%d/%m/%YYYY'
def iterate_series(client): def iterate_series(client):
result = client.jellyfin.user_items(params = { result = client.jellyfin.user_items(
params={
'includeItemTypes': 'Series', 'includeItemTypes': 'Series',
'parentId': 'a656b907eb3a73532e40e44b968d0225', 'parentId': 'a656b907eb3a73532e40e44b968d0225',
'userId': 'dd95c1085c1b4e83ba8e8853fbc644ab', 'userId': 'dd95c1085c1b4e83ba8e8853fbc644ab',
}) },
)
yield from result['Items'] yield from result['Items']
def iterate_watched_episodes_of_series(client, series_id: str): def iterate_watched_episodes_of_series(client, series_id: str):
result = client.jellyfin.user_items(params = { result = client.jellyfin.user_items(
params={
'filters': 'IsPlayed', 'filters': 'IsPlayed',
'recursive': True, 'recursive': True,
'includeItemTypes': 'Episode', 'includeItemTypes': 'Episode',
'parentId': series_id, 'parentId': series_id,
'userId': 'dd95c1085c1b4e83ba8e8853fbc644ab', 'userId': 'dd95c1085c1b4e83ba8e8853fbc644ab',
'fields': 'AirTime', 'fields': 'AirTime',
}) },
)
yield from result['Items'] yield from result['Items']
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class JellyfinWatchHistoryScraper(Scraper): class JellyfinWatchHistoryScraper(Scraper):
dataset_name = 'show_episodes_watched' dataset_name = 'show_episodes_watched'
@ -45,12 +50,15 @@ class JellyfinWatchHistoryScraper(Scraper):
def scrape(self) -> Iterator[dict[str, Any]]: def scrape(self) -> Iterator[dict[str, Any]]:
client = JellyfinClient() client = JellyfinClient()
client.config.app('personal_data', _version.__version__, client.config.app(
'test_machine', 'unique_id_1') 'personal_data', _version.__version__, 'test_machine', 'unique_id_1',
)
client.config.data["auth.ssl"] = False client.config.data['auth.ssl'] = False
client.auth.connect_to_address(secrets.JELLYFIN_URL) client.auth.connect_to_address(secrets.JELLYFIN_URL)
client.auth.login(secrets.JELLYFIN_URL, secrets.JELLYFIN_USERNAME, secrets.JELLYFIN_PASSWORD) client.auth.login(
secrets.JELLYFIN_URL, secrets.JELLYFIN_USERNAME, secrets.JELLYFIN_PASSWORD,
)
for series_data in iterate_series(client): for series_data in iterate_series(client):
series_id = series_data['Id'] series_id = series_data['Id']
@ -70,4 +78,3 @@ class JellyfinWatchHistoryScraper(Scraper):
del episode_data del episode_data
del series_data, series_id del series_data, series_id

View File

@ -4,13 +4,13 @@ import re
from collections.abc import Iterator from collections.abc import Iterator
import bs4 import bs4
import requests_util
import personal_data.html_util import personal_data.html_util
from personal_data import secrets from personal_data import secrets
from personal_data.data import DeduplicateMode, Scraper from personal_data.data import DeduplicateMode, Scraper
from .. import parse_util from .. import parse_util
import requests_util
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -27,6 +27,7 @@ def game_psnprofiles_id_from_url(relative_url: str) -> int:
MAX_NUMBER_GAMES_TO_PARSE = 1000 MAX_NUMBER_GAMES_TO_PARSE = 1000
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class PsnProfilesScraper(Scraper): class PsnProfilesScraper(Scraper):
dataset_name = 'games_played_playstation' dataset_name = 'games_played_playstation'
@ -57,7 +58,7 @@ class PsnProfilesScraper(Scraper):
requests_util.setup_limiter( requests_util.setup_limiter(
self.session, self.session,
URL_API_ROOT, URL_API_ROOT,
per_minute = 5, per_minute=5,
) )
def _scrape_games_overview(self) -> Iterator[dict]: def _scrape_games_overview(self) -> Iterator[dict]:

View File

@ -2,18 +2,19 @@ import dataclasses
import datetime import datetime
import logging import logging
import re import re
import bs4
from typing import Any
from collections.abc import Iterator from collections.abc import Iterator
from typing import Any
import bs4
from .. import html_util, parse_util, secrets
from ..data import DeduplicateMode, Scraper from ..data import DeduplicateMode, Scraper
from .. import secrets, parse_util, html_util
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
URL_SITE_ROOT = 'https://steamcommunity.com/' URL_SITE_ROOT = 'https://steamcommunity.com/'
URL_GAME_ACHIVEMENTS = URL_SITE_ROOT+'id/{username}/stats/appid/{appid}' URL_GAME_ACHIVEMENTS = URL_SITE_ROOT + 'id/{username}/stats/appid/{appid}'
FORMAT_DATE_HEADER = '%d/%m/%YYYY' FORMAT_DATE_HEADER = '%d/%m/%YYYY'
@ -39,14 +40,15 @@ class SteamAchievementScraper(Scraper):
# Parse data # Parse data
soup = bs4.BeautifulSoup(response.content, 'lxml') soup = bs4.BeautifulSoup(response.content, 'lxml')
game_name: str = re.match(r'Steam Community :: (.+) :: Jmaa', soup.head.title.get_text()).group(1) game_name: str = re.match(
r'Steam Community :: (.+) :: Jmaa', soup.head.title.get_text(),
).group(1)
soup = html_util.normalize_soup_slightly( soup = html_util.normalize_soup_slightly(
soup, soup,
classes=False, classes=False,
) )
for entry in soup.select('.achieveRow'): for entry in soup.select('.achieveRow'):
trophy_name: str = entry.select_one('h3').get_text() trophy_name: str = entry.select_one('h3').get_text()
trophy_desc: str = entry.select_one('h5').get_text() trophy_desc: str = entry.select_one('h5').get_text()
@ -55,7 +57,9 @@ class SteamAchievementScraper(Scraper):
time_acquired_html: str = entry.select_one('.achieveUnlockTime') time_acquired_html: str = entry.select_one('.achieveUnlockTime')
if time_acquired_html is None: if time_acquired_html is None:
continue continue
time_acquired_text: str = time_acquired_html.get_text().strip().removeprefix('Unlocked ') time_acquired_text: str = (
time_acquired_html.get_text().strip().removeprefix('Unlocked ')
)
time_acquired: datetime.datetime = parse_util.parse_time(time_acquired_text) time_acquired: datetime.datetime = parse_util.parse_time(time_acquired_text)
yield { yield {

View File

@ -8,14 +8,15 @@ import dataclasses
from decimal import Decimal from decimal import Decimal
import bs4 import bs4
import requests_util
import personal_data.html_util import personal_data.html_util
import personal_data.parse_util import personal_data.parse_util
from personal_data.data import DeduplicateMode, Scraper from personal_data.data import DeduplicateMode, Scraper
import requests_util
URL_API_ROOT = 'https://tavex.dk/' URL_API_ROOT = 'https://tavex.dk/'
def parse_dkk_price(dkk: str) -> Decimal: def parse_dkk_price(dkk: str) -> Decimal:
if dkk.strip() == '-': if dkk.strip() == '-':
return None return None
@ -38,7 +39,7 @@ class TavexScraperBase(Scraper):
requests_util.setup_limiter( requests_util.setup_limiter(
self.session, self.session,
URL_API_ROOT, URL_API_ROOT,
per_minute = 5, per_minute=5,
) )
def scrape(self): def scrape(self):

View File

@ -43,8 +43,10 @@ if cfscrape:
class CachedCfScrape(requests_cache.CacheMixin, cfscrape.CloudflareScraper): class CachedCfScrape(requests_cache.CacheMixin, cfscrape.CloudflareScraper):
pass pass
CACHE_EXPIRE_DEFAULT = datetime.timedelta(days=7) CACHE_EXPIRE_DEFAULT = datetime.timedelta(days=7)
def get_session( def get_session(
cookiejar: Sequence, cookiejar: Sequence,
*, *,
@ -58,7 +60,9 @@ def get_session(
return requests.Session() return requests.Session()
if cfscrape: if cfscrape:
session_class = CachedCfScrape session_class = CachedCfScrape
session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=CACHE_EXPIRE_DEFAULT) session = session_class(
OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=CACHE_EXPIRE_DEFAULT,
)
for cookie in cookiejar: for cookie in cookiejar:
session.cookies.set_cookie(cookie) session.cookies.set_cookie(cookie)
return session return session
@ -66,6 +70,7 @@ def get_session(
def available_scrapers() -> list[type[data.Scraper]]: def available_scrapers() -> list[type[data.Scraper]]:
from . import fetchers # noqa from . import fetchers # noqa
subclasses = [] subclasses = []
class_queue = [data.Scraper] class_queue = [data.Scraper]
while class_queue: while class_queue:

View File

@ -39,7 +39,7 @@ def parse_response_datetime(response) -> datetime.datetime:
LOCAL_TIMEZONE = datetime.datetime.now(datetime.UTC).astimezone().tzinfo LOCAL_TIMEZONE = datetime.datetime.now(datetime.UTC).astimezone().tzinfo
def try_parse(text:str, fmt:str) -> datetime.datetime | None: def try_parse(text: str, fmt: str) -> datetime.datetime | None:
try: try:
time = datetime.datetime.strptime(text, fmt) time = datetime.datetime.strptime(text, fmt)
if time.tzinfo is None: if time.tzinfo is None:

View File

@ -108,10 +108,10 @@ def deduplicate_by_ignoring_certain_fields(
def deduplicate_dicts( def deduplicate_dicts(
dicts: Sequence[dict[str,typing.Any] | frozendict[str,typing.Any]], dicts: Sequence[dict[str, typing.Any] | frozendict[str, typing.Any]],
deduplicate_mode: data.DeduplicateMode, deduplicate_mode: data.DeduplicateMode,
deduplicate_ignore_columns: list[str], deduplicate_ignore_columns: list[str],
) -> tuple[Sequence[dict[str,typing.Any]], list[str]]: ) -> tuple[Sequence[dict[str, typing.Any]], list[str]]:
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
fieldnames = [] fieldnames = []
@ -141,9 +141,13 @@ def deduplicate_dicts(
return dicts, fieldnames return dicts, fieldnames
def normalize_dict(d: dict[str,typing.Any]) -> frozendict[str,typing.Any]: def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]:
return frozendict( return frozendict(
{k: csv_str_to_value(str(v)) for k, v in d.items() if csv_str_to_value(str(v)) is not None}, {
k: csv_str_to_value(str(v))
for k, v in d.items()
if csv_str_to_value(str(v)) is not None
},
) )
@ -168,7 +172,7 @@ def load_csv_file(csv_file: Path) -> list[frozendict]:
def extend_csv_file( def extend_csv_file(
csv_file: Path, csv_file: Path,
new_dicts: list[dict[str,typing.Any]], new_dicts: list[dict[str, typing.Any]],
deduplicate_mode: data.DeduplicateMode, deduplicate_mode: data.DeduplicateMode,
deduplicate_ignore_columns: list[str], deduplicate_ignore_columns: list[str],
) -> dict: ) -> dict:
@ -198,7 +202,7 @@ def extend_csv_file(
) )
writer.writeheader() writer.writeheader()
for d in dicts: for d in dicts:
writable_d = {k:csv_safe_value(v) for k,v in d.items()} writable_d = {k: csv_safe_value(v) for k, v in d.items()}
writer.writerow(writable_d) writer.writerow(writable_d)
del d, writable_d del d, writable_d
output_csv = csvfile_in_memory.getvalue() output_csv = csvfile_in_memory.getvalue()

View File

@ -24,7 +24,6 @@ def generate_calendar(rows: list[dict]) -> icalendar.Calendar:
cal.add('version', '2.0') cal.add('version', '2.0')
for event_data in rows: for event_data in rows:
# Select data # Select data
possible_time_keys = [ possible_time_keys = [
k for k, v in event_data.items() if isinstance(v, datetime.date) k for k, v in event_data.items() if isinstance(v, datetime.date)
@ -41,7 +40,9 @@ def generate_calendar(rows: list[dict]) -> icalendar.Calendar:
continue continue
title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts]) title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts])
description = '\n\n'.join(event_data[k] for k in possible_name_keys[max_title_parts:]) description = '\n\n'.join(
event_data[k] for k in possible_name_keys[max_title_parts:]
)
# Create event # Create event
event = icalendar.Event() event = icalendar.Event()

View File

@ -4,4 +4,3 @@ import personal_data.main
def test_available(): def test_available():
names = personal_data.main.available_scraper_names() names = personal_data.main.available_scraper_names()
assert len(names) > 0 assert len(names) > 0