1
0

Ruff
All checks were successful
Test Python / Test (push) Successful in 32s

This commit is contained in:
Jon Michael Aanes 2024-10-11 00:54:01 +02:00
parent 33337cd1a2
commit ecab909851
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
8 changed files with 114 additions and 59 deletions

View File

@ -10,13 +10,14 @@ from typing import Any
from personal_data.util import load_csv_file from personal_data.util import load_csv_file
from .obsidian import ObsidianVault, Event from .obsidian import Event, ObsidianVault
logger = getLogger(__name__) logger = getLogger(__name__)
Row = dict[str,Any] Row = dict[str, Any]
Rows = list[Row] Rows = list[Row]
def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int: def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
num_updated = 0 num_updated = 0
for row in rows: for row in rows:
@ -43,6 +44,7 @@ def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
del row, date del row, date
return num_updated return num_updated
def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int: def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
MINIMUM = 300 MINIMUM = 300
@ -55,8 +57,9 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
rows_per_date[date].append(row) rows_per_date[date].append(row)
del date, row del date, row
steps_per_date = {
steps_per_date = { date: sum(row['Steps'] for row in rows) for date, rows in rows_per_date.items()} date: sum(row['Steps'] for row in rows) for date, rows in rows_per_date.items()
}
for date, steps in steps_per_date.items(): for date, steps in steps_per_date.items():
if steps < MINIMUM: if steps < MINIMUM:
@ -68,6 +71,7 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
return num_updated return num_updated
def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int: def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
# TODO: Update to using git_time_tracker event parsing system # TODO: Update to using git_time_tracker event parsing system
verb = 'Watched' verb = 'Watched'
@ -82,11 +86,16 @@ def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
del date, row del date, row
del rows del rows
def map_to_event(row: Row) -> Event: def map_to_event(row: Row) -> Event:
start = row['me.last_played_time'].time().replace(second=0, microsecond=0, fold=0) start = (
row['me.last_played_time'].time().replace(second=0, microsecond=0, fold=0)
)
end = start end = start
comment = '{} Episode {}: *{}*'.format(row['season.name'], row['episode.index'], row['episode.name']) comment = '{} Episode {}: *{}*'.format(
row['season.name'],
row['episode.index'],
row['episode.name'],
)
return Event(start, end, verb, row['series.name'], comment) return Event(start, end, verb, row['series.name'], comment)
for date, rows in rows_per_date.items(): for date, rows in rows_per_date.items():
@ -99,6 +108,7 @@ def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
return num_updated return num_updated
def import_data(obsidian_path: Path, dry_run=True): def import_data(obsidian_path: Path, dry_run=True):
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None) vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
@ -110,7 +120,9 @@ def import_data(obsidian_path: Path, dry_run=True):
logger.info('Updated %d files', num_updated) logger.info('Updated %d files', num_updated)
if False: if False:
data_path = Path('/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv') data_path = Path(
'/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv',
)
rows = load_csv_file(data_path) rows = load_csv_file(data_path)
logger.info('Loaded CSV with %d lines', len(rows)) logger.info('Loaded CSV with %d lines', len(rows))
num_updated = import_step_counts_csv(vault, rows) num_updated = import_step_counts_csv(vault, rows)
@ -123,5 +135,3 @@ def import_data(obsidian_path: Path, dry_run=True):
rows = rows[:7] rows = rows[:7]
num_updated = import_watched_series_csv(vault, rows) num_updated = import_watched_series_csv(vault, rows)
logger.info('Updated %d files', num_updated) logger.info('Updated %d files', num_updated)

View File

@ -1,21 +1,22 @@
import dataclasses
import datetime import datetime
import json import json
import re import re
import marko
import marko.md_renderer
import dataclasses
from decimal import Decimal from decimal import Decimal
from logging import getLogger from logging import getLogger
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
import frontmatter import frontmatter
import marko
import marko.md_renderer
logger = getLogger(__name__) logger = getLogger(__name__)
StatisticKey = str StatisticKey = str
@dataclasses.dataclass(frozen = True)
@dataclasses.dataclass(frozen=True)
class Event: class Event:
start_time: datetime.time | None start_time: datetime.time | None
end_time: datetime.time | None end_time: datetime.time | None
@ -23,22 +24,25 @@ class Event:
subject: str subject: str
comment: str comment: str
@dataclasses.dataclass(frozen = True)
@dataclasses.dataclass(frozen=True)
class FileContents: class FileContents:
frontmatter: dict[str, Any] frontmatter: dict[str, Any]
blocks_pre_events: list blocks_pre_events: list
events: list[Event] events: list[Event]
blocks_post_events: list blocks_post_events: list
MARKDOWN_PARSER = marko.Markdown() MARKDOWN_PARSER = marko.Markdown()
MARKDOWN_RENDERER = marko.md_renderer.MarkdownRenderer() MARKDOWN_RENDERER = marko.md_renderer.MarkdownRenderer()
FILE_FORMAT=''' FILE_FORMAT = """
{blocks_pre_events} {blocks_pre_events}
## Events ## Events
{block_events} {block_events}
{blocks_post_events} {blocks_post_events}
''' """
class ObsidianVault: class ObsidianVault:
def __init__(self, vault_path: Path, read_only: bool = 'silent'): def __init__(self, vault_path: Path, read_only: bool = 'silent'):
@ -54,14 +58,19 @@ class ObsidianVault:
self.read_only = read_only self.read_only = read_only
def get_statistic( def get_statistic(
self, date: datetime.date, statistic_key: StatisticKey, self,
date: datetime.date,
statistic_key: StatisticKey,
) -> Any | None: ) -> Any | None:
if contents := self._get_date_contents(date): if contents := self._get_date_contents(date):
return contents.frontmatter.get(statistic_key) return contents.frontmatter.get(statistic_key)
return None return None
def add_statistic( def add_statistic(
self, date: datetime.date, statistic_key: StatisticKey, amount: Any, self,
date: datetime.date,
statistic_key: StatisticKey,
amount: Any,
) -> bool: ) -> bool:
# Adjust arguments # Adjust arguments
if isinstance(amount, Decimal): if isinstance(amount, Decimal):
@ -96,7 +105,9 @@ class ObsidianVault:
def add_events(self, date: datetime.date, events: list[Event]) -> bool: def add_events(self, date: datetime.date, events: list[Event]) -> bool:
if self.read_only == 'silent': if self.read_only == 'silent':
logger.info( logger.info(
'Read-only ObsidianVault ignoring add_event(%s, "%s", ?)', date, events, 'Read-only ObsidianVault ignoring add_event(%s, "%s", ?)',
date,
events,
) )
return False return False
@ -126,10 +137,20 @@ class ObsidianVault:
def _save_contents(self, date: datetime.date, contents: FileContents) -> None: def _save_contents(self, date: datetime.date, contents: FileContents) -> None:
logger.info('Formatting file "%s"', date) logger.info('Formatting file "%s"', date)
blocks_pre_events = ''.join(MARKDOWN_RENDERER.render(b) for b in contents.blocks_pre_events) blocks_pre_events = ''.join(
blocks_post_events = ''.join(MARKDOWN_RENDERER.render(b) for b in contents.blocks_post_events) MARKDOWN_RENDERER.render(b) for b in contents.blocks_pre_events
block_events = '\n'.join('- ' + format_event_string(e) for e in unique(contents.events)) )
text = FILE_FORMAT.format(blocks_pre_events=blocks_pre_events,blocks_post_events=blocks_post_events,block_events=block_events).strip() blocks_post_events = ''.join(
MARKDOWN_RENDERER.render(b) for b in contents.blocks_post_events
)
block_events = '\n'.join(
'- ' + format_event_string(e) for e in unique(contents.events)
)
text = FILE_FORMAT.format(
blocks_pre_events=blocks_pre_events,
blocks_post_events=blocks_post_events,
block_events=block_events,
).strip()
logger.info('Saving file "%s"', date) logger.info('Saving file "%s"', date)
with open(self._date_file_path(date), 'wb') as f: with open(self._date_file_path(date), 'wb') as f:
@ -156,39 +177,61 @@ class ObsidianVault:
def _daily_template_path(self): def _daily_template_path(self):
return (self.vault_path / self.template_file_path).with_suffix('.md') return (self.vault_path / self.template_file_path).with_suffix('.md')
def find_events_list_block(ast) -> tuple[list, list[str], list]: def find_events_list_block(ast) -> tuple[list, list[str], list]:
blocks = ast.children blocks = ast.children
for block_i, block in enumerate(blocks): for block_i, block in enumerate(blocks):
if isinstance(block, marko.block.Heading) and block.children[0].children.lower() == 'events': if (
events_block = ast.children[block_i+1] isinstance(block, marko.block.Heading)
and block.children[0].children.lower() == 'events'
):
events_block = ast.children[block_i + 1]
if isinstance(events_block, marko.block.List): if isinstance(events_block, marko.block.List):
offset = 2 offset = 2
event_texts = [MARKDOWN_RENDERER.render_children(li).strip() for li in events_block.children] event_texts = [
MARKDOWN_RENDERER.render_children(li).strip()
for li in events_block.children
]
else: else:
offset = 1 offset = 1
event_texts = [] event_texts = []
return (blocks[:block_i], event_texts, blocks[block_i+offset:]) return (blocks[:block_i], event_texts, blocks[block_i + offset :])
return (blocks, [], []) return (blocks, [], [])
def format_event_string(event: Event) -> str: def format_event_string(event: Event) -> str:
assert event is not None assert event is not None
if event.start_time is None and event.end_time is None and event.subject is None and event.verb is None: if (
event.start_time is None
and event.end_time is None
and event.subject is None
and event.verb is None
):
return event.comment return event.comment
return f'{event.start_time:%H:%M} | {event.verb} [[{event.subject}]]. {event.comment}'.strip() return f'{event.start_time:%H:%M} | {event.verb} [[{event.subject}]]. {event.comment}'.strip()
RE_TIME = r'(\d\d:\d\d(?::\d\d(?:\.\d+?))?)' RE_TIME = r'(\d\d:\d\d(?::\d\d(?:\.\d+?))?)'
def parse_event_string(event_str: str) -> Event: def parse_event_string(event_str: str) -> Event:
if m := re.match(r'^\s*'+RE_TIME+r'[ :\|-]*(\w+ed)\s+\[([^\]]*)\]\([^)]*\)\.?\s*(.*)$', event_str): if m := re.match(
r'^\s*' + RE_TIME + r'[ :\|-]*(\w+ed)\s+\[([^\]]*)\]\([^)]*\)\.?\s*(.*)$',
event_str,
):
start = datetime.time.fromisoformat(m.group(1)) start = datetime.time.fromisoformat(m.group(1))
return Event(start, start, m.group(2), m.group(3), m.group(4)) return Event(start, start, m.group(2), m.group(3), m.group(4))
if m := re.match(r'^\s*'+RE_TIME+'[ :\|-]*(\w+ed)\s+\[\[([^\]]*)\]\]\.?\s*(.*)$', event_str): if m := re.match(
r'^\s*' + RE_TIME + '[ :\|-]*(\w+ed)\s+\[\[([^\]]*)\]\]\.?\s*(.*)$',
event_str,
):
start = datetime.time.fromisoformat(m.group(1)) start = datetime.time.fromisoformat(m.group(1))
return Event(start, start, m.group(2), m.group(3), m.group(4)) return Event(start, start, m.group(2), m.group(3), m.group(4))
logger.info('Could not parse format: %s', event_str) logger.info('Could not parse format: %s', event_str)
return Event(None,None,None,None,event_str) return Event(None, None, None, None, event_str)
def unique(ls: list) -> list: def unique(ls: list) -> list:
return list(dict.fromkeys(ls)) return list(dict.fromkeys(ls))

View File

@ -53,7 +53,8 @@ class LodestoneAchievementScraper(Scraper):
).group(1) ).group(1)
time_acquired = int(time_acquired) time_acquired = int(time_acquired)
time_acquired = datetime.datetime.fromtimestamp( time_acquired = datetime.datetime.fromtimestamp(
time_acquired, tz=datetime.UTC, time_acquired,
tz=datetime.UTC,
) )
trophy_desc = ( trophy_desc = (
entry.select_one('.entry__activity__txt').get_text().strip() entry.select_one('.entry__activity__txt').get_text().strip()

View File

@ -51,13 +51,18 @@ class JellyfinWatchHistoryScraper(Scraper):
client = JellyfinClient() client = JellyfinClient()
client.config.app( client.config.app(
'personal_data', _version.__version__, 'test_machine', 'unique_id_1', 'personal_data',
_version.__version__,
'test_machine',
'unique_id_1',
) )
client.config.data['auth.ssl'] = False client.config.data['auth.ssl'] = False
client.auth.connect_to_address(secrets.JELLYFIN_URL) client.auth.connect_to_address(secrets.JELLYFIN_URL)
client.auth.login( client.auth.login(
secrets.JELLYFIN_URL, secrets.JELLYFIN_USERNAME, secrets.JELLYFIN_PASSWORD, secrets.JELLYFIN_URL,
secrets.JELLYFIN_USERNAME,
secrets.JELLYFIN_PASSWORD,
) )
for series_data in iterate_series(client): for series_data in iterate_series(client):

View File

@ -61,7 +61,8 @@ class SteamAchievementScraper(Scraper):
soup = bs4.BeautifulSoup(response.content, 'lxml') soup = bs4.BeautifulSoup(response.content, 'lxml')
game_name: str = re.match( game_name: str = re.match(
r'Steam Community :: (.+) :: .*', soup.head.title.get_text(), r'Steam Community :: (.+) :: .*',
soup.head.title.get_text(),
).group(1) ).group(1)
soup = html_util.normalize_soup_slightly( soup = html_util.normalize_soup_slightly(

View File

@ -5,47 +5,41 @@ API](https://developer.withings.com/api-reference/) using the [non-official
Withings API Python Client](https://pypi.org/project/withings-api/). Withings API Python Client](https://pypi.org/project/withings-api/).
""" """
import withings_api
from withings_api.common import get_measure_value, MeasureType, CredentialsType
import datetime
import dataclasses import dataclasses
import datetime
import logging import logging
import re import pickle
from collections.abc import Iterator
import subprocess import subprocess
from pathlib import Path
import bs4 import withings_api
import requests_util from withings_api.common import CredentialsType
import personal_data.html_util
from personal_data import secrets from personal_data import secrets
from personal_data.data import DeduplicateMode, Scraper from personal_data.data import DeduplicateMode, Scraper
from .. import parse_util
import pickle
from pathlib import Path
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
CREDENTIALS_FILE = Path('secrets/withings_oath_creds') CREDENTIALS_FILE = Path('secrets/withings_oath_creds')
def save_credentials(credentials: CredentialsType) -> None: def save_credentials(credentials: CredentialsType) -> None:
"""Save credentials to a file.""" """Save credentials to a file."""
logger.info("Saving credentials in: %s", CREDENTIALS_FILE) logger.info('Saving credentials in: %s', CREDENTIALS_FILE)
with open(CREDENTIALS_FILE, "wb") as file_handle: with open(CREDENTIALS_FILE, 'wb') as file_handle:
pickle.dump(credentials, file_handle) pickle.dump(credentials, file_handle)
def load_credentials() -> CredentialsType: def load_credentials() -> CredentialsType:
"""Load credentials from a file.""" """Load credentials from a file."""
logger.info("Using credentials saved in: %s", CREDENTIALS_FILE) logger.info('Using credentials saved in: %s', CREDENTIALS_FILE)
try: try:
with open(CREDENTIALS_FILE, "rb") as file_handle: with open(CREDENTIALS_FILE, 'rb') as file_handle:
return pickle.load(file_handle) return pickle.load(file_handle)
except FileNotFoundError: except FileNotFoundError:
return None return None
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class WithingsActivityScraper(Scraper): class WithingsActivityScraper(Scraper):
dataset_name = 'withings_activity' dataset_name = 'withings_activity'
@ -86,12 +80,12 @@ class WithingsActivityScraper(Scraper):
# Now you are ready to make calls for data. # Now you are ready to make calls for data.
api = withings_api.WithingsApi(credentials) api = withings_api.WithingsApi(credentials)
start = datetime.date.today() - datetime.timedelta(days = 200) start = datetime.date.today() - datetime.timedelta(days=200)
end = datetime.date.today() end = datetime.date.today()
activity_result = api.measure_get_activity( activity_result = api.measure_get_activity(
startdateymd=start, startdateymd=start,
enddateymd=end, enddateymd=end,
) )
for activity in activity_result.activities: for activity in activity_result.activities:
sample = dict(activity) sample = dict(activity)
@ -99,4 +93,3 @@ class WithingsActivityScraper(Scraper):
del sample['timezone'], sample['is_tracker'] del sample['timezone'], sample['is_tracker']
yield sample yield sample
del activity, sample del activity, sample

View File

@ -61,7 +61,9 @@ def get_session(
if cfscrape: if cfscrape:
session_class = CachedCfScrape session_class = CachedCfScrape
session = session_class( session = session_class(
OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=CACHE_EXPIRE_DEFAULT, OUTPUT_PATH / 'web_cache',
cookies=cookiejar,
expire_after=CACHE_EXPIRE_DEFAULT,
) )
for cookie in cookiejar: for cookie in cookiejar:
session.cookies.set_cookie(cookie) session.cookies.set_cookie(cookie)

View File

@ -1,7 +1,7 @@
import _csv
import csv import csv
import datetime import datetime
import decimal import decimal
import _csv
import io import io
import logging import logging
import typing import typing