From d9f8047be4f98b7d2cf4db3dcc62088e9f061107 Mon Sep 17 00:00:00 2001 From: Jon Michael Aanes Date: Tue, 8 Oct 2024 22:57:41 +0200 Subject: [PATCH] ObsidianVault now supports events --- obsidian_import/__init__.py | 34 ++++++--- obsidian_import/obsidian.py | 135 ++++++++++++++++++++++++++++++------ 2 files changed, 138 insertions(+), 31 deletions(-) diff --git a/obsidian_import/__init__.py b/obsidian_import/__init__.py index e6377db..d116f6d 100644 --- a/obsidian_import/__init__.py +++ b/obsidian_import/__init__.py @@ -10,7 +10,7 @@ from typing import Any from personal_data.util import load_csv_file -from .obsidian import ObsidianVault +from .obsidian import ObsidianVault, Event logger = getLogger(__name__) @@ -65,14 +65,32 @@ def import_step_counts_csv(vault: ObsidianVault, rows: list[dict[str,Any]]) -> i return num_updated +def import_played_dates_csv(vault: ObsidianVault, rows: list[dict[str,Any]]) -> int: + date = datetime.date(2024,10,9) + event = Event(datetime.time(12,00), datetime.time(12,00), 'Tested', 'Obsidian Import') + updated = vault.add_event(date, event) + + return updated and 1 or 0 + def import_data(obsidian_path: Path, dry_run=True): vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None) - #data_path = Path('/home/jmaa/Notes/workout.csv') - data_path = Path('/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv') - rows = load_csv_file(data_path) - logger.info('Loaded CSV with %d lines', len(rows)) - #num_updated = import_workout_csv(vault, rows) - num_updated = import_step_counts_csv(vault, rows) + if False: + data_path = Path('/home/jmaa/Notes/workout.csv') + rows = load_csv_file(data_path) + logger.info('Loaded CSV with %d lines', len(rows)) + num_updated = import_workout_csv(vault, rows) + logger.info('Updated %d files', num_updated) + + if False: + data_path = Path('/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv') + rows = load_csv_file(data_path) + logger.info('Loaded CSV with %d lines', len(rows)) + num_updated = import_step_counts_csv(vault, rows) + logger.info('Updated %d files', num_updated) + + if True: + num_updated = import_played_dates_csv(vault, []) # TODO + logger.info('Updated %d files', num_updated) + - logger.info('Updated %d files', num_updated) diff --git a/obsidian_import/obsidian.py b/obsidian_import/obsidian.py index fd82344..0aa873d 100644 --- a/obsidian_import/obsidian.py +++ b/obsidian_import/obsidian.py @@ -1,5 +1,9 @@ import datetime import json +import re +import marko +import marko.md_renderer +import dataclasses from decimal import Decimal from logging import getLogger from pathlib import Path @@ -11,6 +15,29 @@ logger = getLogger(__name__) StatisticKey = str +@dataclasses.dataclass(frozen = True) +class Event: + start_time: datetime.time | None + end_time: datetime.time | None + verb: str + subject: str + +@dataclasses.dataclass(frozen = True) +class FileContents: + frontmatter: dict[str, Any] + blocks_pre_events: list + events: list[Event] + blocks_post_events: list + +MARKDOWN_PARSER = marko.Markdown() +MARKDOWN_RENDERER = marko.md_renderer.MarkdownRenderer() + +FILE_FORMAT=''' +{blocks_pre_events} +## Events +{block_events} +{blocks_post_events} +''' class ObsidianVault: def __init__(self, vault_path: Path, read_only: bool = 'silent'): @@ -28,55 +55,87 @@ class ObsidianVault: def get_statistic( self, date: datetime.date, statistic_key: StatisticKey, ) -> Any | None: - try: - with open(self._date_file_path(date)) as f: - data = frontmatter.load(f) - except FileNotFoundError: - return None - - return data.metadata.get(statistic_key) + if contents := self._get_date_contents(date): + return contents.frontmatter.get(statistic_key) + return None def add_statistic( self, date: datetime.date, statistic_key: StatisticKey, amount: Any, ) -> bool: + # Adjust arguments + if isinstance(amount, Decimal): + amount = float(amount) + + # Check for silent if self.read_only == 'silent': logger.info( - 'Real only ObsidianVault ignoring add_statistic(%s, "%s", %s)', + 'Read-only ObsidianVault ignoring add_statistic(%s, "%s", %s)', date, statistic_key, amount, ) return False + # Load contents self._create_date_if_not_present(date) + contents = self._get_date_contents(date) - with open(self._date_file_path(date)) as f: - data = frontmatter.load(f) - - if isinstance(amount, Decimal): - amount = float(amount) - - if data.metadata.get(statistic_key) == amount: + # Update contents + if contents.frontmatter.get(statistic_key) == amount: return False - data.metadata[statistic_key] = amount + contents.frontmatter[statistic_key] = amount if amount is None: - del data.metadata[statistic_key] - - with open(self._date_file_path(date), 'wb') as f: - frontmatter.dump(data, f) + del contents.frontmatter[statistic_key] + # Save contents + self._save_contents(date, contents) return True - def add_event(self, date: datetime.date, verb: str, subject: str) -> None: + def add_event(self, date: datetime.date, event: Event) -> bool: if self.read_only == 'silent': logger.info( - 'Real only ObsidianVault ignoring add_event(%s, "%s", ?)', date, verb, + 'Read-only ObsidianVault ignoring add_event(%s, "%s", ?)', date, event, ) return self._create_date_if_not_present(date) - # TODO + contents = self._get_date_contents(date) + if event in contents.events: + logger.info('Events already exist in "%s"', date) + return False + contents.events.append(event) + + self._save_contents(date, contents) + return True + + def get_events(self, date: datetime.date) -> list[Event]: + contents = self._get_date_contents(date) + if contents is None: + return [] + return contents.events + + def _get_date_contents(self, date: datetime.date) -> FileContents | None: + try: + with open(self._date_file_path(date)) as f: + file_frontmatter = frontmatter.load(f) + except FileNotFoundError: + return None + + ast = MARKDOWN_PARSER.parse(str(file_frontmatter)) + (pre_events, list_block_items, post_events) = find_events_list_block(ast) + events = [parse_event_string(list_item) for list_item in list_block_items] + return FileContents(file_frontmatter.metadata, pre_events, events, post_events) + + def _save_contents(self, date: datetime.date, contents: FileContents) -> None: + blocks_pre_events = ''.join(MARKDOWN_RENDERER.render(b) for b in contents.blocks_pre_events) + blocks_post_events = ''.join(MARKDOWN_RENDERER.render(b) for b in contents.blocks_post_events) + block_events = '\n'.join('- ' + format_event_string(e) for e in unique(contents.events)) + text = FILE_FORMAT.format(blocks_pre_events=blocks_pre_events,blocks_post_events=blocks_post_events,block_events=block_events).strip() + + logger.info('Saving file "%s"', date) + with open(self._date_file_path(date), 'wb') as f: + frontmatter.dump(frontmatter.Post(text, **contents.frontmatter), f) def _create_date_if_not_present(self, date: datetime.date): date_file = self._date_file_path(date) @@ -98,3 +157,33 @@ class ObsidianVault: def _daily_template_path(self): return (self.vault_path / self.template_file_path).with_suffix('.md') + +def find_events_list_block(ast) -> tuple[list, list[str], list]: + blocks = ast.children + for block_i, block in enumerate(blocks): + if isinstance(block, marko.block.Heading) and block.children[0].children.lower() == 'events': + events_block = ast.children[block_i+1] + if isinstance(events_block, marko.block.List): + offset = 2 + event_texts = [MARKDOWN_RENDERER.render_children(li).strip() for li in events_block.children] + else: + offset = 1 + event_texts = [] + + return (blocks[:block_i], event_texts, blocks[block_i+offset:]) + return (blocks, [], []) + +def format_event_string(event: Event) -> str: + return f'{event.start_time}: {event.verb} [[{event.subject}]]' + +def parse_event_string(event_str: str) -> Event: + if m := re.match(r'^\s*(\d\d:\d\d(?::\d\d)?):?\s+(\w+ed)\s+\[([^\]]*)\]\([^)]*\)\.?\s*$', event_str): + start = datetime.time.fromisoformat(m.group(1)) + return Event(start, start, m.group(2), m.group(3)) + if m := re.match(r'^\s*(\d\d:\d\d(?::\d\d)?):?\s+(\w+ed)\s+\[\[([^\]]*)\]\]$', event_str): + start = datetime.time.fromisoformat(m.group(1)) + return Event(start, start, m.group(2), m.group(3)) + return None + +def unique(ls: list) -> list: + return list(dict.fromkeys(ls))