128 lines
3.8 KiB
Python
128 lines
3.8 KiB
Python
"""Obsidian Import.
|
|
|
|
Sub-module for importing time-based data into Obsidian.
|
|
"""
|
|
|
|
import datetime
|
|
from logging import getLogger
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from personal_data.util import load_csv_file
|
|
|
|
from .obsidian import ObsidianVault, Event
|
|
|
|
logger = getLogger(__name__)
|
|
|
|
Row = dict[str,Any]
|
|
Rows = list[Row]
|
|
|
|
def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
|
|
num_updated = 0
|
|
for row in rows:
|
|
date = row['Date']
|
|
was_updated = False
|
|
mapping = {
|
|
'Cycling (mins)': ('Cycling (Duration)', 'minutes'),
|
|
'Cycling (kcals)': ('Cycling (kcals)', ''),
|
|
'Weight (Kg)': ('Weight (Kg)', ''),
|
|
}
|
|
|
|
for input_key, (output_key, unit) in mapping.items():
|
|
v = row.get(input_key)
|
|
if v is not None:
|
|
if unit:
|
|
v = str(v) + ' ' + unit
|
|
was_updated |= vault.add_statistic(date, output_key, v)
|
|
if input_key != output_key:
|
|
was_updated |= vault.add_statistic(date, input_key, None)
|
|
del input_key, output_key, unit, v
|
|
|
|
if was_updated:
|
|
num_updated += 1
|
|
del row, date
|
|
return num_updated
|
|
|
|
def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
|
|
MINIMUM = 300
|
|
|
|
num_updated = 0
|
|
|
|
rows_per_date = {}
|
|
for row in rows:
|
|
date = row['Start'].date()
|
|
rows_per_date.setdefault(date, [])
|
|
rows_per_date[date].append(row)
|
|
del date, row
|
|
|
|
|
|
steps_per_date = { date: sum(row['Steps'] for row in rows) for date, rows in rows_per_date.items()}
|
|
|
|
for date, steps in steps_per_date.items():
|
|
if steps < MINIMUM:
|
|
continue
|
|
was_updated = vault.add_statistic(date, 'Steps', steps)
|
|
if was_updated:
|
|
num_updated += 1
|
|
del date, steps, was_updated
|
|
|
|
return num_updated
|
|
|
|
def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
|
|
# TODO: Update to using git_time_tracker event parsing system
|
|
verb = 'Watched'
|
|
|
|
num_updated = 0
|
|
|
|
rows_per_date = {}
|
|
for row in rows:
|
|
date = row['me.last_played_time'].date()
|
|
rows_per_date.setdefault(date, [])
|
|
rows_per_date[date].append(row)
|
|
del date, row
|
|
del rows
|
|
|
|
|
|
def map_to_event(row: Row) -> Event:
|
|
start = row['me.last_played_time'].time().replace(second=0, microsecond=0, fold=0)
|
|
end = start
|
|
comment = '{} Episode {}: *{}*'.format(row['season.name'], row['episode.index'], row['episode.name'])
|
|
return Event(start, end, verb, row['series.name'], comment)
|
|
|
|
for date, rows in rows_per_date.items():
|
|
events = [map_to_event(row) for row in rows]
|
|
was_updated = vault.add_events(date, events)
|
|
|
|
if was_updated:
|
|
num_updated += 1
|
|
del date, was_updated
|
|
|
|
return num_updated
|
|
|
|
def import_data(obsidian_path: Path, dry_run=True):
|
|
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
|
|
|
|
if False:
|
|
data_path = Path('/home/jmaa/Notes/workout.csv')
|
|
rows = load_csv_file(data_path)
|
|
logger.info('Loaded CSV with %d lines', len(rows))
|
|
num_updated = import_workout_csv(vault, rows)
|
|
logger.info('Updated %d files', num_updated)
|
|
|
|
if False:
|
|
data_path = Path('/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv')
|
|
rows = load_csv_file(data_path)
|
|
logger.info('Loaded CSV with %d lines', len(rows))
|
|
num_updated = import_step_counts_csv(vault, rows)
|
|
logger.info('Updated %d files', num_updated)
|
|
|
|
if True:
|
|
data_path = Path('output/show_episodes_watched.csv')
|
|
rows = load_csv_file(data_path)
|
|
logger.info('Loaded CSV with %d lines', len(rows))
|
|
rows = rows[:7]
|
|
num_updated = import_watched_series_csv(vault, rows)
|
|
logger.info('Updated %d files', num_updated)
|
|
|
|
|