import dataclasses import datetime import json import re from decimal import Decimal from logging import getLogger from pathlib import Path from typing import Any import frontmatter import marko import marko.md_renderer logger = getLogger(__name__) StatisticKey = str @dataclasses.dataclass(frozen=True) class Event: start_time: datetime.time | None end_time: datetime.time | None verb: str | None subject: str | None comment: str def __post_init__(self): if self.subject: assert ':' not in self.subject assert '/' not in self.subject @dataclasses.dataclass(frozen=True) class FileContents: frontmatter: dict[str, Any] blocks_pre_events: list events: frozenset[Event] blocks_post_events: list MARKDOWN_PARSER = marko.Markdown() MARKDOWN_RENDERER = marko.md_renderer.MarkdownRenderer() FILE_FORMAT = """ {blocks_pre_events} ## Events {block_events} {blocks_post_events} """ class ObsidianVault: def __init__(self, vault_path: Path, read_only: bool = 'silent'): self.vault_path = vault_path assert (self.vault_path / '.obsidian').exists(), 'Not an Obsidian Vault' with open(self.vault_path / '.obsidian' / 'daily-notes.json') as f: daily_notes_config = json.load(f) self.daily_folder = daily_notes_config['folder'] self.path_format = daily_notes_config['format'] self.template_file_path = daily_notes_config['template'] self.read_only = read_only def get_statistic( self, date: datetime.date, statistic_key: StatisticKey, ) -> Any | None: if contents := self._get_date_contents(date): return contents.frontmatter.get(statistic_key) return None def add_statistic( self, date: datetime.date, statistic_key: StatisticKey, amount: Any, ) -> bool: # Adjust arguments if isinstance(amount, Decimal): amount = float(amount) # Check for silent if self.read_only == 'silent': logger.info( 'Read-only ObsidianVault ignoring add_statistic(%s, "%s", %s)', date, statistic_key, amount, ) return False # Load contents self._create_date_if_not_present(date) contents = self._get_date_contents(date) # Update contents if contents.frontmatter.get(statistic_key) == amount: return False contents.frontmatter[statistic_key] = amount if amount is None: del contents.frontmatter[statistic_key] # Save contents self._save_contents(date, contents) return True def add_events(self, date: datetime.date, events: list[Event]) -> bool: if self.read_only == 'silent': logger.info( 'Read-only ObsidianVault ignoring add_event(%s, "%s", ?)', date, events, ) if not self.read_only: self._create_date_if_not_present(date) contents = self._get_date_contents(date) if contents is None: return False # Exit without writing if there were no changes. updated_events: frozenset[Event] = contents.events | set(events) if contents.events == updated_events: return False contents = dataclasses.replace(contents, events = updated_events) if not self.read_only: self._save_contents(date, contents) return True def get_events(self, date: datetime.date) -> frozenset[Event]: contents = self._get_date_contents(date) if contents is None: return frozenset() return contents.events def _get_date_contents(self, date: datetime.date) -> FileContents | None: try: with open(self._date_file_path(date)) as f: file_frontmatter = frontmatter.load(f) except FileNotFoundError: return None ast = MARKDOWN_PARSER.parse(str(file_frontmatter)) (pre_events, list_block_items, post_events) = find_events_list_block(ast) events = frozenset(parse_event_string(list_item) for list_item in list_block_items) return FileContents(file_frontmatter.metadata, pre_events, events, post_events) def _save_contents(self, date: datetime.date, contents: FileContents) -> None: logger.info('Formatting file "%s"', date) blocks_pre_events = ''.join( MARKDOWN_RENDERER.render(b) for b in contents.blocks_pre_events ) blocks_post_events = ''.join( MARKDOWN_RENDERER.render(b) for b in contents.blocks_post_events ) events = sorted(contents.events, key=lambda x: x.start_time or x.end_time) block_events = '\n'.join( '- ' + format_event_string(e) for e in events ) text = FILE_FORMAT.format( blocks_pre_events=blocks_pre_events, blocks_post_events=blocks_post_events, block_events=block_events, ).strip() logger.info('Saving file "%s"', date) with open(self._date_file_path(date), 'wb') as f: frontmatter.dump(frontmatter.Post(text, **contents.frontmatter), f) def _create_date_if_not_present(self, date: datetime.date): date_file = self._date_file_path(date) if date_file.exists(): return logger.info('File "%s" doesn\'t exist, creating...', date) with open(self._daily_template_path()) as f: template_text = f.read() date_file.parent.mkdir(exist_ok=True, parents=True) with open(date_file, 'w') as f: f.write(template_text) def _date_file_path(self, date: datetime.date): path = ( self.path_format.replace('YYYY', str(date.year)) .replace('MM', f'{date.month:02d}') .replace('DD', f'{date.day:02d}') ) return (self.vault_path / self.daily_folder / path).with_suffix('.md') def _daily_template_path(self): return (self.vault_path / self.template_file_path).with_suffix('.md') def find_events_list_block(ast) -> tuple[list, list[str], list]: blocks = ast.children for block_i, block in enumerate(blocks): if ( isinstance(block, marko.block.Heading) and block.children[0].children.lower() == 'events' ): events_block = ast.children[block_i + 1] if isinstance(events_block, marko.block.List): offset = 2 event_texts = [ MARKDOWN_RENDERER.render_children(li).strip() for li in events_block.children ] else: offset = 1 event_texts = [] return (blocks[:block_i], event_texts, blocks[block_i + offset :]) return (blocks, [], []) def format_event_string(event: Event) -> str: assert event is not None if ( event.start_time is None and event.end_time is None and event.subject is None and event.verb is None ): return event.comment buf = [] buf.append(f'{event.start_time:%H:%M}') if event.end_time and event.end_time != event.start_time: buf.append(f'-{event.end_time:%H:%M}') buf.append(' | ') buf.append(event.verb) buf.append(' [[') buf.append(event.subject) buf.append(']].') buf.append((' ' + event.comment).strip()) return ''.join(buf) RE_TIME = r'(\d\d:\d\d(?::\d\d(?:\.\d+?))?)' RE_VERB = r'(\w+(?:ed|te))' RE_LINK_MD = r'\[([^\]:/]*)\]\(?:[^)]*\)' RE_LINK_WIKI = r'\[\[([^\]:/]*)\]\]' RE_TIME_FORMAT = RE_TIME + r'(?:\s*\-\s*' + RE_TIME + r')?' def parse_event_string(event_str: str) -> Event: if m := re.match( r'^\s*' + RE_TIME_FORMAT + r'[ :\|-]*'+RE_VERB+r'\s+'+RE_LINK_MD+r'\.?\s*(.*)$', event_str, ): start = datetime.time.fromisoformat(m.group(1)) end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start return Event(start, end, m.group(3), m.group(4), m.group(5)) if m := re.match( r'^\s*' + RE_TIME_FORMAT + r'[ :\|-]*'+RE_VERB+r'\s+'+RE_LINK_WIKI+r'\.?\s*(.*)$', event_str, ): start = datetime.time.fromisoformat(m.group(1)) end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start return Event(start, end, m.group(3), m.group(4), m.group(5)) logger.info('Could not parse format: %s', event_str) return Event(None, None, None, None, event_str)