1
0
personal-data/obsidian_import/__init__.py

361 lines
11 KiB
Python

"""Obsidian Import.
Sub-module for importing time-based data into Obsidian.
"""
import dataclasses
import datetime
from collections.abc import Iterable, Iterator
from logging import getLogger
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
from personal_data.activity import (
ActivitySample,
Label,
RealizedActivitySample,
heuristically_realize_samples,
merge_adjacent_samples,
)
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
from .obsidian import Event, ObsidianVault
logger = getLogger(__name__)
Row = dict[str, Any]
Rows = list[Row]
HOUR = datetime.timedelta(hours=1)
MINUTE = datetime.timedelta(minutes=1)
SECOND = datetime.timedelta(seconds=1)
def to_text_duration(duration: datetime.timedelta) -> str:
hours = int(duration / HOUR)
duration -= hours * HOUR
minutes = int(duration / MINUTE)
duration -= minutes * MINUTE
seconds = int(duration / SECOND)
l = []
if hours > 0:
l.append(f'{hours} hours')
if minutes > 0:
l.append(f'{minutes} minutes')
if seconds > 0:
l.append(f'{seconds} seconds')
return ' '.join(l)
def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
assert len(rows) > 0
if True:
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
possible_keys = determine_possible_keys(event_data)
logger.info('Found possible keys: %s', possible_keys)
del event_data
assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1
assert len(possible_keys.image) >= 0
for event_data in rows:
(start_at, end_at) = start_end(event_data, possible_keys)
labels = [
Label(k, event_data.get(k)) for k in possible_keys.misc if k in event_data
]
# Create event
yield ActivitySample(
labels=tuple(labels),
start_at=start_at,
end_at=end_at,
)
del event_data
def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
num_updated = 0
for row in rows:
date = row['Date']
was_updated = False
mapping = {
'Cycling (mins)': ('Cycling (Duration)', 'minutes'),
'Cycling (kcals)': ('Cycling (kcals)', ''),
'Weight (Kg)': ('Weight (Kg)', ''),
}
for input_key, (output_key, unit) in mapping.items():
v = row.get(input_key)
if v is not None:
if unit:
v = str(v) + ' ' + unit
was_updated |= vault.add_statistic(date, output_key, v)
if input_key != output_key:
was_updated |= vault.add_statistic(date, input_key, None)
del input_key, output_key, unit, v
if was_updated:
num_updated += 1
del row, date
return num_updated
def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
MINIMUM_STEPS = 300
num_updated = 0
rows_per_date = {}
for row in rows:
date = row['Start'].date()
rows_per_date.setdefault(date, [])
rows_per_date[date].append(row)
del date, row
steps_per_date = {
date: sum(row['Steps'] for row in rows) for date, rows in rows_per_date.items()
}
for date, steps in steps_per_date.items():
if steps < MINIMUM_STEPS:
continue
was_updated = vault.add_statistic(date, 'Steps', steps)
if was_updated:
num_updated += 1
del date, steps, was_updated
return num_updated
def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int:
num_updated = 0
rows_per_date = {}
for row in rows:
date = row['play.start'].date()
rows_per_date.setdefault(date, [])
rows_per_date[date].append(row)
del date, row
COLUMNS = ['score.w1', 'score.w2', 'score.w3', 'score.w4', 'score.w5']
def all_steps(row: dict[str, int]):
return sum(row[column] for column in COLUMNS)
steps_per_date = {
date: sum(all_steps(row) for row in rows)
for date, rows in rows_per_date.items()
}
duration_per_date = {
date: sum((row['play.duration'] for row in rows), start=datetime.timedelta())
for date, rows in rows_per_date.items()
}
print(steps_per_date)
print(duration_per_date)
for date in steps_per_date:
was_updated_1 = vault.add_statistic(
date,
'Stepmania (Steps)',
int(steps_per_date[date]),
)
was_updated_2 = vault.add_statistic(
date,
'Stepmania (Duration)',
to_text_duration(duration_per_date[date]),
)
if was_updated_1 or was_updated_2:
num_updated += 1
del date, was_updated_1, was_updated_2
return num_updated
def escape_for_obsidian_link(link: str) -> str:
return link.replace(':', ' ').replace('/', ' ').replace(' ', ' ')
@dataclasses.dataclass(frozen=True)
class EventContent:
verb: str
subject: str
comment: str
def import_activity_samples(
vault: ObsidianVault,
raw_samples: list[ActivitySample],
content_mapper,
group_category: str | None = None,
default_estimated_duration: datetime.timedelta | None = None,
) -> int:
samples = heuristically_realize_samples(raw_samples,
default_estimated_duration=default_estimated_duration)
if group_category is not None:
samples = merge_adjacent_samples(list(samples), group_category)
timezone = ZoneInfo(
'Europe/Copenhagen',
) # TODO: Parameterize in an intelligent manner
samples_per_date: dict[datetime.date, list[RealizedActivitySample]] = {}
for sample in samples:
date: datetime.date = sample.start_at.astimezone(timezone).date()
samples_per_date.setdefault(date, [])
samples_per_date[date].append(sample)
del date, sample
def map_to_event(sample: RealizedActivitySample) -> Event:
content = content_mapper(sample)
return Event(
sample.start_at,
sample.end_at,
verb=content.verb,
subject=escape_for_obsidian_link(content.subject),
comment=content.comment,
)
num_updated = 0
for date, samples in list(samples_per_date.items()):
events = [map_to_event(sample) for sample in samples]
was_updated = vault.add_events(date, events)
if was_updated:
num_updated += 1
del date, was_updated
return num_updated
def import_activity_sample_csv(
vault: ObsidianVault,
rows: Rows,
content_mapper,
group_category: str | None = None,
) -> int:
raw_samples = list(iterate_samples_from_rows(rows))
return import_activity_samples(vault, raw_samples, content_mapper, group_category)
def map_watched_series_content(sample: RealizedActivitySample) -> EventContent:
subject = sample.single_label_with_category('series.name')
comment = '{} Episode {}: *{}*'.format(
sample.single_label_with_category('season.name'),
sample.single_label_with_category('episode.index'),
sample.single_label_with_category('episode.name'),
)
return EventContent(
verb='Watched',
subject=subject,
comment=comment,
)
def map_games_played_content(sample: RealizedActivitySample) -> EventContent:
subject = sample.single_label_with_category('game.name')
comment = ''
return EventContent(
verb='Played',
subject=subject,
comment=comment,
)
def import_wanikani_events(vault: ObsidianVault, rows: Rows):
keys = ['unlocked_at', 'started_at', 'passed_at', 'burned_at']
raw_samples = []
for row in rows:
for k in keys:
if k in row:
raw_samples.append(ActivitySample([Label('application.name', 'WaniKani')], None, row[k]))
del k
del row
def mapper(sample: RealizedActivitySample) -> EventContent:
subject = sample.single_label_with_category('application.name')
return EventContent(
verb='Practiced',
subject=subject,
comment='',
)
return import_activity_samples(vault, raw_samples, mapper,
group_category='application.name',
default_estimated_duration=datetime.timedelta(minutes=5))
PATH_WATCHED = Path('output/show_episodes_watched.csv')
PATH_PLAYED = Path('output/games_played.csv')
PATH_WANIKANI = Path('output/wanikani_lessons.csv')
PATH_WORKOUT = Path('/home/jmaa/Notes/workout.csv')
PATH_STEP_COUNTS = Path(
'/home/jmaa/Notes/Rawbackupdata/Steps/exportStepCount_2025-03-15_22-58-20',
)
PATH_STEPMANIA = Path('output/stepmania.csv')
IMPORTERS = [
{'path': PATH_WANIKANI, 'standard_variant': True, 'import_rows': import_wanikani_events},
{'path': PATH_WORKOUT, 'standard_variant': True, 'import_rows': import_workout_csv},
{'path': PATH_STEP_COUNTS, 'import_rows': import_step_counts_csv},
{
'path': PATH_STEPMANIA,
'standard_variant': True,
'import_rows': import_stepmania_steps_csv,
},
{
'path': PATH_PLAYED,
'standard_variant': True,
'import_rows': lambda vault, rows: import_activity_sample_csv(
vault,
rows,
map_games_played_content,
group_category='game.name',
),
},
{
'path': PATH_WATCHED,
'standard_variant': True,
'import_rows': lambda vault, rows: import_activity_sample_csv(
vault,
rows,
map_watched_series_content,
),
},
]
def import_data(obsidian_path: Path, dry_run=True):
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
for import_def in IMPORTERS:
if not import_def['path'].exists():
logger.warning(
'Skipping %s: %s is missing',
import_def['import_rows'],
import_def['path'],
)
continue
rows = load_csv_file(
import_def['path'],
sniff=not import_def.get('standard_variant'),
)
logger.info('Loaded CSV with %d lines', len(rows))
num_files_updated = import_def['import_rows'](vault, rows)
logger.info('Updated %d files', num_files_updated)
del import_def, rows
num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty])
logger.info('dirty files in cache: %d', num_dirty)
logger.info(
'clean files in cache: %d',
len(vault.internal_file_text_cache) - num_dirty,
)
if not dry_run:
vault.flush_cache()