"""Obsidian Import. Sub-module for importing time-based data into Obsidian. """ import dataclasses import datetime from collections.abc import Iterable, Iterator from logging import getLogger from pathlib import Path from typing import Any from zoneinfo import ZoneInfo from personal_data.activity import ( ActivitySample, Label, RealizedActivitySample, heuristically_realize_samples, merge_adjacent_samples, ) from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end from .obsidian import Event, ObsidianVault logger = getLogger(__name__) Row = dict[str, Any] Rows = list[Row] HOUR = datetime.timedelta(hours=1) MINUTE = datetime.timedelta(minutes=1) SECOND = datetime.timedelta(seconds=1) def to_text_duration(duration: datetime.timedelta) -> str: hours = int(duration / HOUR) duration -= hours * HOUR minutes = int(duration / MINUTE) duration -= minutes * MINUTE seconds = int(duration / SECOND) l = [] if hours > 0: l.append(f'{hours} hours') if minutes > 0: l.append(f'{minutes} minutes') if seconds > 0: l.append(f'{seconds} seconds') return ' '.join(l) def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]: assert len(rows) > 0 if True: event_data = rows[len(rows) // 2] # Hopefully select a useful representative. possible_keys = determine_possible_keys(event_data) logger.info('Found possible keys: %s', possible_keys) del event_data assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1 assert len(possible_keys.image) >= 0 for event_data in rows: (start_at, end_at) = start_end(event_data, possible_keys) labels = [ Label(k, event_data.get(k)) for k in possible_keys.misc if k in event_data ] # Create event yield ActivitySample( labels=tuple(labels), start_at=start_at, end_at=end_at, ) del event_data def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int: num_updated = 0 for row in rows: date = row['Date'] was_updated = False mapping = { 'Cycling (mins)': ('Cycling (Duration)', 'minutes'), 'Cycling (kcals)': ('Cycling (kcals)', ''), 'Weight (Kg)': ('Weight (Kg)', ''), } for input_key, (output_key, unit) in mapping.items(): v = row.get(input_key) if v is not None: if unit: v = str(v) + ' ' + unit was_updated |= vault.add_statistic(date, output_key, v) if input_key != output_key: was_updated |= vault.add_statistic(date, input_key, None) del input_key, output_key, unit, v if was_updated: num_updated += 1 del row, date return num_updated def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int: MINIMUM_STEPS = 300 num_updated = 0 rows_per_date = {} for row in rows: date = row['Start'].date() rows_per_date.setdefault(date, []) rows_per_date[date].append(row) del date, row steps_per_date = { date: sum(row['Steps'] for row in rows) for date, rows in rows_per_date.items() } for date, steps in steps_per_date.items(): if steps < MINIMUM_STEPS: continue was_updated = vault.add_statistic(date, 'Steps', steps) if was_updated: num_updated += 1 del date, steps, was_updated return num_updated def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int: num_updated = 0 rows_per_date = {} for row in rows: date = row['play.start'].date() rows_per_date.setdefault(date, []) rows_per_date[date].append(row) del date, row COLUMNS = ['score.w1', 'score.w2', 'score.w3', 'score.w4', 'score.w5'] def all_steps(row: dict[str, int]): return sum(row[column] for column in COLUMNS) steps_per_date = { date: sum(all_steps(row) for row in rows) for date, rows in rows_per_date.items() } duration_per_date = { date: sum((row['play.duration'] for row in rows), start=datetime.timedelta()) for date, rows in rows_per_date.items() } print(steps_per_date) print(duration_per_date) for date in steps_per_date: was_updated_1 = vault.add_statistic( date, 'Stepmania (Steps)', int(steps_per_date[date]), ) was_updated_2 = vault.add_statistic( date, 'Stepmania (Duration)', to_text_duration(duration_per_date[date]), ) if was_updated_1 or was_updated_2: num_updated += 1 del date, was_updated_1, was_updated_2 return num_updated def escape_for_obsidian_link(link: str) -> str: return link.replace(':', ' ').replace('/', ' ').replace(' ', ' ') @dataclasses.dataclass(frozen=True) class EventContent: verb: str subject: str comment: str def import_activity_sample_csv( vault: ObsidianVault, rows: Rows, content_mapper, group_category: str | None = None, ) -> int: samples = heuristically_realize_samples(list(iterate_samples_from_rows(rows))) if group_category is not None: samples = merge_adjacent_samples(list(samples), group_category) timezone = ZoneInfo( 'Europe/Copenhagen', ) # TODO: Parameterize in an intelligent manner samples_per_date: dict[datetime.date, list[RealizedActivitySample]] = {} for sample in samples: date: datetime.date = sample.start_at.astimezone(timezone).date() samples_per_date.setdefault(date, []) samples_per_date[date].append(sample) del date, sample del rows def map_to_event(sample: RealizedActivitySample) -> Event: content = content_mapper(sample) return Event( sample.start_at, sample.end_at, verb=content.verb, subject=escape_for_obsidian_link(content.subject), comment=content.comment, ) num_updated = 0 for date, samples in list(samples_per_date.items()): events = [map_to_event(sample) for sample in samples] was_updated = vault.add_events(date, events) if was_updated: num_updated += 1 del date, was_updated return num_updated def map_watched_series_content(sample: RealizedActivitySample) -> EventContent: subject = sample.single_label_with_category('series.name') comment = '{} Episode {}: *{}*'.format( sample.single_label_with_category('season.name'), sample.single_label_with_category('episode.index'), sample.single_label_with_category('episode.name'), ) return EventContent( verb='Watched', subject=subject, comment=comment, ) def map_games_played_content(sample: RealizedActivitySample) -> EventContent: subject = sample.single_label_with_category('game.name') comment = '' return EventContent( verb='Played', subject=subject, comment=comment, ) PATH_WATCHED = Path('output/show_episodes_watched.csv') PATH_PLAYED = Path('output/games_played.csv') PATH_WORKOUT = Path('/home/jmaa/Notes/workout.csv') PATH_STEP_COUNTS = Path( '/home/jmaa/Notes/Rawbackupdata/Steps/exportStepCount_2025-03-15_22-58-20', ) PATH_STEPMANIA = Path('output/stepmania.csv') IMPORTERS = [ {'path': PATH_WORKOUT, 'standard_variant': True, 'import_rows': import_workout_csv}, {'path': PATH_STEP_COUNTS, 'import_rows': import_step_counts_csv}, { 'path': PATH_STEPMANIA, 'standard_variant': True, 'import_rows': import_stepmania_steps_csv, }, { 'path': PATH_PLAYED, 'standard_variant': True, 'import_rows': lambda vault, rows: import_activity_sample_csv( vault, rows, map_games_played_content, group_category='game.name', ), }, { 'path': PATH_WATCHED, 'standard_variant': True, 'import_rows': lambda vault, rows: import_activity_sample_csv( vault, rows, map_watched_series_content, ), }, ] def import_data(obsidian_path: Path, dry_run=True): vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None) for import_def in IMPORTERS: if not import_def['path'].exists(): logger.warning( 'Skipping %s: %s is missing', import_def['import_rows'], import_def['path'], ) continue rows = load_csv_file( import_def['path'], sniff=not import_def.get('standard_variant'), ) logger.info('Loaded CSV with %d lines', len(rows)) num_files_updated = import_def['import_rows'](vault, rows) logger.info('Updated %d files', num_files_updated) del import_def, rows num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty]) logger.info('dirty files in cache: %d', num_dirty) logger.info( 'clean files in cache: %d', len(vault.internal_file_text_cache) - num_dirty, ) if not dry_run: vault.flush_cache()