import dataclasses import datetime import json import re from decimal import Decimal from logging import getLogger from pathlib import Path from typing import Any import frontmatter import marko import marko.md_renderer logger = getLogger(__name__) StatisticKey = str @dataclasses.dataclass(frozen=True, order=True) class Event: start_time: datetime.time | None end_time: datetime.time | None verb: str | None subject: str | None comment: str def __post_init__(self): if self.subject: assert ':' not in self.subject assert '/' not in self.subject @dataclasses.dataclass(frozen=True) class FileContents: frontmatter: dict[str, Any] blocks_pre_events: list events: frozenset[Event] blocks_post_events: list @dataclasses.dataclass(frozen=False) class CachedFile: data: bytes is_dirty: bool MARKDOWN_PARSER = marko.Markdown() MARKDOWN_RENDERER = marko.md_renderer.MarkdownRenderer() FILE_FORMAT = """ {blocks_pre_events} ## Events {block_events} {blocks_post_events} """ MIDNIGHT = datetime.time(0, 0, 0) class ObsidianVault: def __init__(self, vault_path: Path, read_only: bool = 'silent'): self.vault_path = vault_path assert (self.vault_path / '.obsidian').exists(), 'Not an Obsidian Vault' with open(self.vault_path / '.obsidian' / 'daily-notes.json') as f: daily_notes_config = json.load(f) self.daily_folder = daily_notes_config['folder'] self.path_format = daily_notes_config['format'] self.template_file_path = daily_notes_config['template'] self.read_only = read_only self.internal_file_text_cache: dict[Path, CachedFile] = {} def get_statistic( self, date: datetime.date, statistic_key: StatisticKey, ) -> Any | None: if contents := self._load_date_contents(date): return contents.frontmatter.get(statistic_key) return None def add_statistic( self, date: datetime.date, statistic_key: StatisticKey, amount: Any, ) -> bool: # Adjust arguments if isinstance(amount, Decimal): amount = float(amount) # Load contents contents = self._load_date_contents(date) # Update contents if contents.frontmatter.get(statistic_key) == amount: return False contents.frontmatter[statistic_key] = amount if amount is None: del contents.frontmatter[statistic_key] # Save contents self._save_date_contents(date, contents) return True def add_events(self, date: datetime.date, events: list[Event]) -> bool: contents = self._load_date_contents(date) if contents is None: return False # Exit without writing if there were no changes. updated_events: frozenset[Event] = contents.events | set(events) if contents.events == updated_events: return False contents = dataclasses.replace(contents, events=updated_events) self._save_date_contents(date, contents) return True def get_events(self, date: datetime.date) -> frozenset[Event]: contents = self._load_date_contents(date) if contents is None: return frozenset() return contents.events def _load_date_contents(self, date: datetime.date) -> FileContents | None: file_path = self._date_file_path(date) text = self._load_file_text(file_path) or self._load_file_text(self._daily_template_path()) assert text is not None file_frontmatter = frontmatter.loads(text) ast = MARKDOWN_PARSER.parse(str(file_frontmatter)) (pre_events, list_block_items, post_events) = find_events_list_block(ast) events = frozenset( parse_event_string(list_item) for list_item in list_block_items ) return FileContents(file_frontmatter.metadata, pre_events, events, post_events) def _save_date_contents(self, date: datetime.date, contents: FileContents) -> None: blocks_pre_events = ''.join( MARKDOWN_RENDERER.render(b) for b in contents.blocks_pre_events ) blocks_post_events = ''.join( MARKDOWN_RENDERER.render(b) for b in contents.blocks_post_events ) events = list(contents.events) events.sort(key=lambda x: x.subject or '') events.sort(key=lambda x: x.verb or '') events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT) block_events = '\n'.join('- ' + format_event_string(e) for e in events) self._save_file_text_to_cache(self._date_file_path(date), FILE_FORMAT.format( blocks_pre_events=blocks_pre_events, blocks_post_events=blocks_post_events, block_events=block_events, ).strip().encode('utf8')) def _save_file_text_to_cache(self, path: Path, text: bytes) -> None: if path not in self.internal_file_text_cache: self.internal_file_text_cache[path] = CachedFile(None, False) self.internal_file_text_cache[path].data = text self.internal_file_text_cache[path].is_dirty = True def _date_file_path(self, date: datetime.date) -> Path: path = ( self.path_format.replace('YYYY', str(date.year)) .replace('MM', f'{date.month:02d}') .replace('DD', f'{date.day:02d}') ) return (self.vault_path / self.daily_folder / path).with_suffix('.md') def _daily_template_path(self) -> Path: return (self.vault_path / self.template_file_path).with_suffix('.md') def _load_file_text(self, path: Path) -> bytes | None: if path not in self.internal_file_text_cache: try: with open(path, 'rb') as f: self.internal_file_text_cache[path] = CachedFile(f.read(), False) except FileNotFoundError: return None return self.internal_file_text_cache[path].data def flush_cache(self) -> None: if self.read_only: msg = 'Read-only ObsidianVault cannot be flushed' raise RuntimeError(msg) for path, cached_file in self.internal_file_text_cache.items(): if cached_file.is_dirty: logger.info('Saving file "%s"', path) path.parent.mkdir(exist_ok=True, parents=True) with open(path, 'wb') as f: f.write(cached_file.data) del path, cached_file def find_events_list_block(ast) -> tuple[list, list[str], list]: blocks = ast.children for block_i, block in enumerate(blocks): if ( isinstance(block, marko.block.Heading) and block.children[0].children.lower() == 'events' ): events_block = ast.children[block_i + 1] if isinstance(events_block, marko.block.List): offset = 2 event_texts = [ MARKDOWN_RENDERER.render_children(li).strip() for li in events_block.children ] else: offset = 1 event_texts = [] return (blocks[:block_i], event_texts, blocks[block_i + offset :]) return (blocks, [], []) def format_event_string(event: Event) -> str: assert event is not None if ( event.start_time is None and event.end_time is None and event.subject is None and event.verb is None ): return event.comment buf = [] buf.append(f'{event.start_time:%H:%M}') if event.end_time and event.end_time != event.start_time: buf.append(f'-{event.end_time:%H:%M}') buf.append(' | ') buf.append(event.verb) buf.append(' [[') buf.append(event.subject) buf.append((']]. ' + event.comment).strip()) return ''.join(buf) RE_TIME = r'(\d\d:\d\d(?::\d\d(?:\.\d+?))?)' RE_VERB = r'(\w+(?:ed|te))' RE_LINK_MD = r'\[([^\]:/]*)\]\(?:[^)]*\)' RE_LINK_WIKI = r'\[\[([^\]:/]*)\]\]' RE_TIME_FORMAT = RE_TIME + r'(?:\s*\-\s*' + RE_TIME + r')?' def parse_event_string(event_str: str) -> Event: if m := re.match( r'^\s*' + RE_TIME_FORMAT + r'[ :\|-]*' + RE_VERB + r'\s+' + RE_LINK_MD + r'\.?\s*(.*)$', event_str, ): start = datetime.time.fromisoformat(m.group(1)) end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start return Event(start, end, m.group(3), m.group(4), m.group(5)) if m := re.match( r'^\s*' + RE_TIME_FORMAT + r'[ :\|-]*' + RE_VERB + r'\s+' + RE_LINK_WIKI + r'\.?\s*(.*)$', event_str, ): start = datetime.time.fromisoformat(m.group(1)) end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start return Event(start, end, m.group(3), m.group(4), m.group(5)) logger.info('Could not parse format: %s', event_str) return Event(None, None, None, None, event_str)