1
0

Compare commits

...

4 Commits

Author SHA1 Message Date
ecab909851
Ruff
All checks were successful
Test Python / Test (push) Successful in 32s
2024-10-11 00:56:15 +02:00
33337cd1a2
Added Withings API 2024-10-11 00:55:39 +02:00
d23ee1ce18
Support importing series events 2024-10-11 00:55:38 +02:00
d9f8047be4
ObsidianVault now supports events 2024-10-11 00:55:38 +02:00
9 changed files with 355 additions and 49 deletions

View File

@ -10,11 +10,15 @@ from typing import Any
from personal_data.util import load_csv_file from personal_data.util import load_csv_file
from .obsidian import ObsidianVault from .obsidian import Event, ObsidianVault
logger = getLogger(__name__) logger = getLogger(__name__)
def import_workout_csv(vault: ObsidianVault, rows: list[dict[str,Any]]) -> int: Row = dict[str, Any]
Rows = list[Row]
def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
num_updated = 0 num_updated = 0
for row in rows: for row in rows:
date = row['Date'] date = row['Date']
@ -40,22 +44,24 @@ def import_workout_csv(vault: ObsidianVault, rows: list[dict[str,Any]]) -> int:
del row, date del row, date
return num_updated return num_updated
def import_step_counts_csv(vault: ObsidianVault, rows: list[dict[str,Any]]) -> int:
def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
MINIMUM = 300 MINIMUM = 300
num_updated = 0 num_updated = 0
rows_per_day = {} rows_per_date = {}
for row in rows: for row in rows:
date = row['Start'].date() date = row['Start'].date()
rows_per_day.setdefault(date, []) rows_per_date.setdefault(date, [])
rows_per_day[date].append(row) rows_per_date[date].append(row)
del date, row del date, row
steps_per_date = {
date: sum(row['Steps'] for row in rows) for date, rows in rows_per_date.items()
}
steps_per_day = { date: sum(row['Steps'] for row in rows) for date, rows in rows_per_day.items()} for date, steps in steps_per_date.items():
for date, steps in steps_per_day.items():
if steps < MINIMUM: if steps < MINIMUM:
continue continue
was_updated = vault.add_statistic(date, 'Steps', steps) was_updated = vault.add_statistic(date, 'Steps', steps)
@ -65,14 +71,67 @@ def import_step_counts_csv(vault: ObsidianVault, rows: list[dict[str,Any]]) -> i
return num_updated return num_updated
def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
# TODO: Update to using git_time_tracker event parsing system
verb = 'Watched'
num_updated = 0
rows_per_date = {}
for row in rows:
date = row['me.last_played_time'].date()
rows_per_date.setdefault(date, [])
rows_per_date[date].append(row)
del date, row
del rows
def map_to_event(row: Row) -> Event:
start = (
row['me.last_played_time'].time().replace(second=0, microsecond=0, fold=0)
)
end = start
comment = '{} Episode {}: *{}*'.format(
row['season.name'],
row['episode.index'],
row['episode.name'],
)
return Event(start, end, verb, row['series.name'], comment)
for date, rows in rows_per_date.items():
events = [map_to_event(row) for row in rows]
was_updated = vault.add_events(date, events)
if was_updated:
num_updated += 1
del date, was_updated
return num_updated
def import_data(obsidian_path: Path, dry_run=True): def import_data(obsidian_path: Path, dry_run=True):
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None) vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
#data_path = Path('/home/jmaa/Notes/workout.csv') if False:
data_path = Path('/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv') data_path = Path('/home/jmaa/Notes/workout.csv')
rows = load_csv_file(data_path) rows = load_csv_file(data_path)
logger.info('Loaded CSV with %d lines', len(rows)) logger.info('Loaded CSV with %d lines', len(rows))
#num_updated = import_workout_csv(vault, rows) num_updated = import_workout_csv(vault, rows)
num_updated = import_step_counts_csv(vault, rows) logger.info('Updated %d files', num_updated)
if False:
data_path = Path(
'/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv',
)
rows = load_csv_file(data_path)
logger.info('Loaded CSV with %d lines', len(rows))
num_updated = import_step_counts_csv(vault, rows)
logger.info('Updated %d files', num_updated)
if True:
data_path = Path('output/show_episodes_watched.csv')
rows = load_csv_file(data_path)
logger.info('Loaded CSV with %d lines', len(rows))
rows = rows[:7]
num_updated = import_watched_series_csv(vault, rows)
logger.info('Updated %d files', num_updated) logger.info('Updated %d files', num_updated)

View File

@ -1,17 +1,49 @@
import dataclasses
import datetime import datetime
import json import json
import re
from decimal import Decimal from decimal import Decimal
from logging import getLogger from logging import getLogger
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
import frontmatter import frontmatter
import marko
import marko.md_renderer
logger = getLogger(__name__) logger = getLogger(__name__)
StatisticKey = str StatisticKey = str
@dataclasses.dataclass(frozen=True)
class Event:
start_time: datetime.time | None
end_time: datetime.time | None
verb: str
subject: str
comment: str
@dataclasses.dataclass(frozen=True)
class FileContents:
frontmatter: dict[str, Any]
blocks_pre_events: list
events: list[Event]
blocks_post_events: list
MARKDOWN_PARSER = marko.Markdown()
MARKDOWN_RENDERER = marko.md_renderer.MarkdownRenderer()
FILE_FORMAT = """
{blocks_pre_events}
## Events
{block_events}
{blocks_post_events}
"""
class ObsidianVault: class ObsidianVault:
def __init__(self, vault_path: Path, read_only: bool = 'silent'): def __init__(self, vault_path: Path, read_only: bool = 'silent'):
self.vault_path = vault_path self.vault_path = vault_path
@ -26,57 +58,103 @@ class ObsidianVault:
self.read_only = read_only self.read_only = read_only
def get_statistic( def get_statistic(
self, date: datetime.date, statistic_key: StatisticKey, self,
date: datetime.date,
statistic_key: StatisticKey,
) -> Any | None: ) -> Any | None:
try: if contents := self._get_date_contents(date):
with open(self._date_file_path(date)) as f: return contents.frontmatter.get(statistic_key)
data = frontmatter.load(f)
except FileNotFoundError:
return None return None
return data.metadata.get(statistic_key)
def add_statistic( def add_statistic(
self, date: datetime.date, statistic_key: StatisticKey, amount: Any, self,
date: datetime.date,
statistic_key: StatisticKey,
amount: Any,
) -> bool: ) -> bool:
# Adjust arguments
if isinstance(amount, Decimal):
amount = float(amount)
# Check for silent
if self.read_only == 'silent': if self.read_only == 'silent':
logger.info( logger.info(
'Real only ObsidianVault ignoring add_statistic(%s, "%s", %s)', 'Read-only ObsidianVault ignoring add_statistic(%s, "%s", %s)',
date, date,
statistic_key, statistic_key,
amount, amount,
) )
return False return False
# Load contents
self._create_date_if_not_present(date) self._create_date_if_not_present(date)
contents = self._get_date_contents(date)
with open(self._date_file_path(date)) as f: # Update contents
data = frontmatter.load(f) if contents.frontmatter.get(statistic_key) == amount:
if isinstance(amount, Decimal):
amount = float(amount)
if data.metadata.get(statistic_key) == amount:
return False return False
data.metadata[statistic_key] = amount contents.frontmatter[statistic_key] = amount
if amount is None: if amount is None:
del data.metadata[statistic_key] del contents.frontmatter[statistic_key]
with open(self._date_file_path(date), 'wb') as f:
frontmatter.dump(data, f)
# Save contents
self._save_contents(date, contents)
return True return True
def add_event(self, date: datetime.date, verb: str, subject: str) -> None: def add_events(self, date: datetime.date, events: list[Event]) -> bool:
if self.read_only == 'silent': if self.read_only == 'silent':
logger.info( logger.info(
'Real only ObsidianVault ignoring add_event(%s, "%s", ?)', date, verb, 'Read-only ObsidianVault ignoring add_event(%s, "%s", ?)',
date,
events,
) )
return return False
self._create_date_if_not_present(date) self._create_date_if_not_present(date)
# TODO contents = self._get_date_contents(date)
contents.events.extend(events)
self._save_contents(date, contents)
return True
def get_events(self, date: datetime.date) -> list[Event]:
contents = self._get_date_contents(date)
if contents is None:
return []
return contents.events
def _get_date_contents(self, date: datetime.date) -> FileContents | None:
try:
with open(self._date_file_path(date)) as f:
file_frontmatter = frontmatter.load(f)
except FileNotFoundError:
return None
ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
(pre_events, list_block_items, post_events) = find_events_list_block(ast)
events = [parse_event_string(list_item) for list_item in list_block_items]
return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
def _save_contents(self, date: datetime.date, contents: FileContents) -> None:
logger.info('Formatting file "%s"', date)
blocks_pre_events = ''.join(
MARKDOWN_RENDERER.render(b) for b in contents.blocks_pre_events
)
blocks_post_events = ''.join(
MARKDOWN_RENDERER.render(b) for b in contents.blocks_post_events
)
block_events = '\n'.join(
'- ' + format_event_string(e) for e in unique(contents.events)
)
text = FILE_FORMAT.format(
blocks_pre_events=blocks_pre_events,
blocks_post_events=blocks_post_events,
block_events=block_events,
).strip()
logger.info('Saving file "%s"', date)
with open(self._date_file_path(date), 'wb') as f:
frontmatter.dump(frontmatter.Post(text, **contents.frontmatter), f)
def _create_date_if_not_present(self, date: datetime.date): def _create_date_if_not_present(self, date: datetime.date):
date_file = self._date_file_path(date) date_file = self._date_file_path(date)
@ -98,3 +176,62 @@ class ObsidianVault:
def _daily_template_path(self): def _daily_template_path(self):
return (self.vault_path / self.template_file_path).with_suffix('.md') return (self.vault_path / self.template_file_path).with_suffix('.md')
def find_events_list_block(ast) -> tuple[list, list[str], list]:
blocks = ast.children
for block_i, block in enumerate(blocks):
if (
isinstance(block, marko.block.Heading)
and block.children[0].children.lower() == 'events'
):
events_block = ast.children[block_i + 1]
if isinstance(events_block, marko.block.List):
offset = 2
event_texts = [
MARKDOWN_RENDERER.render_children(li).strip()
for li in events_block.children
]
else:
offset = 1
event_texts = []
return (blocks[:block_i], event_texts, blocks[block_i + offset :])
return (blocks, [], [])
def format_event_string(event: Event) -> str:
assert event is not None
if (
event.start_time is None
and event.end_time is None
and event.subject is None
and event.verb is None
):
return event.comment
return f'{event.start_time:%H:%M} | {event.verb} [[{event.subject}]]. {event.comment}'.strip()
RE_TIME = r'(\d\d:\d\d(?::\d\d(?:\.\d+?))?)'
def parse_event_string(event_str: str) -> Event:
if m := re.match(
r'^\s*' + RE_TIME + r'[ :\|-]*(\w+ed)\s+\[([^\]]*)\]\([^)]*\)\.?\s*(.*)$',
event_str,
):
start = datetime.time.fromisoformat(m.group(1))
return Event(start, start, m.group(2), m.group(3), m.group(4))
if m := re.match(
r'^\s*' + RE_TIME + '[ :\|-]*(\w+ed)\s+\[\[([^\]]*)\]\]\.?\s*(.*)$',
event_str,
):
start = datetime.time.fromisoformat(m.group(1))
return Event(start, start, m.group(2), m.group(3), m.group(4))
logger.info('Could not parse format: %s', event_str)
return Event(None, None, None, None, event_str)
def unique(ls: list) -> list:
return list(dict.fromkeys(ls))

View File

@ -53,7 +53,8 @@ class LodestoneAchievementScraper(Scraper):
).group(1) ).group(1)
time_acquired = int(time_acquired) time_acquired = int(time_acquired)
time_acquired = datetime.datetime.fromtimestamp( time_acquired = datetime.datetime.fromtimestamp(
time_acquired, tz=datetime.UTC, time_acquired,
tz=datetime.UTC,
) )
trophy_desc = ( trophy_desc = (
entry.select_one('.entry__activity__txt').get_text().strip() entry.select_one('.entry__activity__txt').get_text().strip()

View File

@ -51,13 +51,18 @@ class JellyfinWatchHistoryScraper(Scraper):
client = JellyfinClient() client = JellyfinClient()
client.config.app( client.config.app(
'personal_data', _version.__version__, 'test_machine', 'unique_id_1', 'personal_data',
_version.__version__,
'test_machine',
'unique_id_1',
) )
client.config.data['auth.ssl'] = False client.config.data['auth.ssl'] = False
client.auth.connect_to_address(secrets.JELLYFIN_URL) client.auth.connect_to_address(secrets.JELLYFIN_URL)
client.auth.login( client.auth.login(
secrets.JELLYFIN_URL, secrets.JELLYFIN_USERNAME, secrets.JELLYFIN_PASSWORD, secrets.JELLYFIN_URL,
secrets.JELLYFIN_USERNAME,
secrets.JELLYFIN_PASSWORD,
) )
for series_data in iterate_series(client): for series_data in iterate_series(client):

View File

@ -61,7 +61,8 @@ class SteamAchievementScraper(Scraper):
soup = bs4.BeautifulSoup(response.content, 'lxml') soup = bs4.BeautifulSoup(response.content, 'lxml')
game_name: str = re.match( game_name: str = re.match(
r'Steam Community :: (.+) :: .*', soup.head.title.get_text(), r'Steam Community :: (.+) :: .*',
soup.head.title.get_text(),
).group(1) ).group(1)
soup = html_util.normalize_soup_slightly( soup = html_util.normalize_soup_slightly(

View File

@ -0,0 +1,95 @@
"""Withings API fetcher.
Supports downloading activity summary from the [Withings
API](https://developer.withings.com/api-reference/) using the [non-official
Withings API Python Client](https://pypi.org/project/withings-api/).
"""
import dataclasses
import datetime
import logging
import pickle
import subprocess
from pathlib import Path
import withings_api
from withings_api.common import CredentialsType
from personal_data import secrets
from personal_data.data import DeduplicateMode, Scraper
logger = logging.getLogger(__name__)
CREDENTIALS_FILE = Path('secrets/withings_oath_creds')
def save_credentials(credentials: CredentialsType) -> None:
"""Save credentials to a file."""
logger.info('Saving credentials in: %s', CREDENTIALS_FILE)
with open(CREDENTIALS_FILE, 'wb') as file_handle:
pickle.dump(credentials, file_handle)
def load_credentials() -> CredentialsType:
"""Load credentials from a file."""
logger.info('Using credentials saved in: %s', CREDENTIALS_FILE)
try:
with open(CREDENTIALS_FILE, 'rb') as file_handle:
return pickle.load(file_handle)
except FileNotFoundError:
return None
@dataclasses.dataclass(frozen=True)
class WithingsActivityScraper(Scraper):
dataset_name = 'withings_activity'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
@staticmethod
def requires_cfscrape() -> bool:
return False
def oauth_flow(self) -> CredentialsType:
if creds := load_credentials():
return creds
auth = withings_api.WithingsAuth(
client_id=secrets.WITHINGS_CLIENTID,
consumer_secret=secrets.WITHINGS_SECRET,
callback_uri=secrets.WITHINGS_CALLBACK_URI,
scope=(
withings_api.AuthScope.USER_ACTIVITY,
withings_api.AuthScope.USER_METRICS,
withings_api.AuthScope.USER_INFO,
withings_api.AuthScope.USER_SLEEP_EVENTS,
),
)
authorize_url = auth.get_authorize_url()
subprocess.run(['firefox', '--new-tab', authorize_url])
credentials_code = input('Please insert your code here: ').strip()
creds = auth.get_credentials(credentials_code)
save_credentials(creds)
return creds
def scrape(self):
credentials = self.oauth_flow()
# Now you are ready to make calls for data.
api = withings_api.WithingsApi(credentials)
start = datetime.date.today() - datetime.timedelta(days=200)
end = datetime.date.today()
activity_result = api.measure_get_activity(
startdateymd=start,
enddateymd=end,
)
for activity in activity_result.activities:
sample = dict(activity)
sample['date'] = activity.date.date()
del sample['timezone'], sample['is_tracker']
yield sample
del activity, sample

View File

@ -61,7 +61,9 @@ def get_session(
if cfscrape: if cfscrape:
session_class = CachedCfScrape session_class = CachedCfScrape
session = session_class( session = session_class(
OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=CACHE_EXPIRE_DEFAULT, OUTPUT_PATH / 'web_cache',
cookies=cookiejar,
expire_after=CACHE_EXPIRE_DEFAULT,
) )
for cookie in cookiejar: for cookie in cookiejar:
session.cookies.set_cookie(cookie) session.cookies.set_cookie(cookie)

View File

@ -40,3 +40,8 @@ MAILGUN_RECIPIENT = load_secret('MAILGUN_RECIPIENT')
JELLYFIN_URL = load_secret('JELLYFIN_URL') JELLYFIN_URL = load_secret('JELLYFIN_URL')
JELLYFIN_USERNAME = load_secret('JELLYFIN_USERNAME') JELLYFIN_USERNAME = load_secret('JELLYFIN_USERNAME')
JELLYFIN_PASSWORD = load_secret('JELLYFIN_PASSWORD') JELLYFIN_PASSWORD = load_secret('JELLYFIN_PASSWORD')
# Withings
WITHINGS_CLIENTID = load_secret('WITHINGS_CLIENTID')
WITHINGS_SECRET = load_secret('WITHINGS_SECRET')
WITHINGS_CALLBACK_URI = load_secret('WITHINGS_CALLBACK_URI')

View File

@ -1,3 +1,4 @@
import _csv
import csv import csv
import datetime import datetime
import decimal import decimal
@ -151,7 +152,7 @@ def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]:
) )
def load_csv_file(csv_file: Path) -> list[frozendict]: def load_csv_file(csv_file: Path) -> list[frozendict[str, typing.Any]]:
dicts: list[frozendict] = [] dicts: list[frozendict] = []
with open(csv_file) as csvfile: with open(csv_file) as csvfile:
dialect = csv.Sniffer().sniff(csvfile.read(1024)) dialect = csv.Sniffer().sniff(csvfile.read(1024))
@ -180,7 +181,7 @@ def extend_csv_file(
try: try:
dicts = load_csv_file(csv_file) dicts = load_csv_file(csv_file)
except FileNotFoundError as e: except (FileNotFoundError, _csv.Error) as e:
logger.info('Creating file: %s', csv_file) logger.info('Creating file: %s', csv_file)
dicts = [] dicts = []