"""Obsidian Import. Sub-module for importing time-based data into Obsidian. """ import datetime from logging import getLogger from pathlib import Path from typing import Any from collections.abc import Iterator from personal_data.csv_import import start_end, determine_possible_keys, load_csv_file from personal_data.activity import ActivitySample, Label, RealizedActivitySample, heuristically_realize_samples from .obsidian import Event, ObsidianVault logger = getLogger(__name__) Row = dict[str, Any] Rows = list[Row] def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]: assert len(rows) > 0 if True: event_data = rows[len(rows) // 2] # Hopefully select a useful representative. possible_keys = determine_possible_keys(event_data) logger.info('Found possible keys: %s', possible_keys) del event_data assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1 assert len(possible_keys.image) >= 0 for event_data in rows: (start_at, end_at) = start_end(event_data, possible_keys) labels = [Label(k, event_data[k]) for k in possible_keys.misc] # Create event yield ActivitySample( labels=tuple(labels), start_at=start_at, end_at=end_at, ) del event_data def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int: num_updated = 0 for row in rows: date = row['Date'] was_updated = False mapping = { 'Cycling (mins)': ('Cycling (Duration)', 'minutes'), 'Cycling (kcals)': ('Cycling (kcals)', ''), 'Weight (Kg)': ('Weight (Kg)', ''), } for input_key, (output_key, unit) in mapping.items(): v = row.get(input_key) if v is not None: if unit: v = str(v) + ' ' + unit was_updated |= vault.add_statistic(date, output_key, v) if input_key != output_key: was_updated |= vault.add_statistic(date, input_key, None) del input_key, output_key, unit, v if was_updated: num_updated += 1 del row, date return num_updated def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int: MINIMUM = 300 num_updated = 0 rows_per_date = {} for row in rows: date = row['Start'].date() rows_per_date.setdefault(date, []) rows_per_date[date].append(row) del date, row steps_per_date = { date: sum(row['Steps'] for row in rows) for date, rows in rows_per_date.items() } for date, steps in steps_per_date.items(): if steps < MINIMUM: continue was_updated = vault.add_statistic(date, 'Steps', steps) if was_updated: num_updated += 1 del date, steps, was_updated return num_updated def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int: verb = 'Watched' samples = heuristically_realize_samples(list(iterate_samples_from_rows(rows))) samples_per_date: dict[datetime.date, list[RealizedActivitySample]] = {} for sample in samples: date: datetime.date = sample.start_at.date() samples_per_date.setdefault(date, []) samples_per_date[date].append(sample) del date, sample del rows def map_to_event(sample: RealizedActivitySample) -> Event: comment = '{} Episode {}: *{}*'.format( sample.single_label_with_category('season.name'), sample.single_label_with_category('episode.index'), sample.single_label_with_category('episode.name'), ) return Event(sample.start_at.time(), sample.end_at.time(), verb, sample.single_label_with_category('series.name'), comment, ) num_updated = 0 for date, samples in samples_per_date.items(): events = [map_to_event(sample) for sample in samples] was_updated = vault.add_events(date, events) if was_updated: num_updated += 1 del date, was_updated return num_updated def import_data(obsidian_path: Path, dry_run=True): vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None) if False: data_path = Path('/home/jmaa/Notes/workout.csv') rows = load_csv_file(data_path) logger.info('Loaded CSV with %d lines', len(rows)) num_updated = import_workout_csv(vault, rows) logger.info('Updated %d files', num_updated) if False: data_path = Path( '/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv', ) rows = load_csv_file(data_path) logger.info('Loaded CSV with %d lines', len(rows)) num_updated = import_step_counts_csv(vault, rows) logger.info('Updated %d files', num_updated) if True: data_path = Path('output/show_episodes_watched.csv') rows = load_csv_file(data_path) logger.info('Loaded CSV with %d lines', len(rows)) rows = rows[:7] num_updated = import_watched_series_csv(vault, rows) logger.info('Updated %d files', num_updated)