1
0
personal-data/obsidian_import/obsidian.py

261 lines
8.0 KiB
Python

import dataclasses
import datetime
import json
import re
from decimal import Decimal
from logging import getLogger
from pathlib import Path
from typing import Any
import frontmatter
import marko
import marko.md_renderer
logger = getLogger(__name__)
StatisticKey = str
@dataclasses.dataclass(frozen=True)
class Event:
start_time: datetime.time | None
end_time: datetime.time | None
verb: str | None
subject: str | None
comment: str
def __post_init__(self):
if self.subject:
assert ':' not in self.subject
assert '/' not in self.subject
@dataclasses.dataclass(frozen=True)
class FileContents:
frontmatter: dict[str, Any]
blocks_pre_events: list
events: list[Event]
blocks_post_events: list
MARKDOWN_PARSER = marko.Markdown()
MARKDOWN_RENDERER = marko.md_renderer.MarkdownRenderer()
FILE_FORMAT = """
{blocks_pre_events}
## Events
{block_events}
{blocks_post_events}
"""
class ObsidianVault:
def __init__(self, vault_path: Path, read_only: bool = 'silent'):
self.vault_path = vault_path
assert (self.vault_path / '.obsidian').exists(), 'Not an Obsidian Vault'
with open(self.vault_path / '.obsidian' / 'daily-notes.json') as f:
daily_notes_config = json.load(f)
self.daily_folder = daily_notes_config['folder']
self.path_format = daily_notes_config['format']
self.template_file_path = daily_notes_config['template']
self.read_only = read_only
def get_statistic(
self,
date: datetime.date,
statistic_key: StatisticKey,
) -> Any | None:
if contents := self._get_date_contents(date):
return contents.frontmatter.get(statistic_key)
return None
def add_statistic(
self,
date: datetime.date,
statistic_key: StatisticKey,
amount: Any,
) -> bool:
# Adjust arguments
if isinstance(amount, Decimal):
amount = float(amount)
# Check for silent
if self.read_only == 'silent':
logger.info(
'Read-only ObsidianVault ignoring add_statistic(%s, "%s", %s)',
date,
statistic_key,
amount,
)
return False
# Load contents
self._create_date_if_not_present(date)
contents = self._get_date_contents(date)
# Update contents
if contents.frontmatter.get(statistic_key) == amount:
return False
contents.frontmatter[statistic_key] = amount
if amount is None:
del contents.frontmatter[statistic_key]
# Save contents
self._save_contents(date, contents)
return True
def add_events(self, date: datetime.date, events: list[Event]) -> bool:
if self.read_only == 'silent':
logger.info(
'Read-only ObsidianVault ignoring add_event(%s, "%s", ?)',
date,
events,
)
return False
self._create_date_if_not_present(date)
contents = self._get_date_contents(date)
contents.events.extend(events)
self._save_contents(date, contents)
return True
def get_events(self, date: datetime.date) -> list[Event]:
contents = self._get_date_contents(date)
if contents is None:
return []
return contents.events
def _get_date_contents(self, date: datetime.date) -> FileContents | None:
try:
with open(self._date_file_path(date)) as f:
file_frontmatter = frontmatter.load(f)
except FileNotFoundError:
return None
ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
(pre_events, list_block_items, post_events) = find_events_list_block(ast)
events = [parse_event_string(list_item) for list_item in list_block_items]
return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
def _save_contents(self, date: datetime.date, contents: FileContents) -> None:
logger.info('Formatting file "%s"', date)
blocks_pre_events = ''.join(
MARKDOWN_RENDERER.render(b) for b in contents.blocks_pre_events
)
blocks_post_events = ''.join(
MARKDOWN_RENDERER.render(b) for b in contents.blocks_post_events
)
block_events = '\n'.join(
'- ' + format_event_string(e) for e in unique(contents.events)
)
text = FILE_FORMAT.format(
blocks_pre_events=blocks_pre_events,
blocks_post_events=blocks_post_events,
block_events=block_events,
).strip()
logger.info('Saving file "%s"', date)
with open(self._date_file_path(date), 'wb') as f:
frontmatter.dump(frontmatter.Post(text, **contents.frontmatter), f)
def _create_date_if_not_present(self, date: datetime.date):
date_file = self._date_file_path(date)
if date_file.exists():
return
logger.info('File "%s" doesn\'t exist, creating...', date)
with open(self._daily_template_path()) as f:
template_text = f.read()
with open(date_file, 'w') as f:
f.write(template_text)
def _date_file_path(self, date: datetime.date):
path = (
self.path_format.replace('YYYY', str(date.year))
.replace('MM', f'{date.month:02d}')
.replace('DD', f'{date.day:02d}')
)
return (self.vault_path / self.daily_folder / path).with_suffix('.md')
def _daily_template_path(self):
return (self.vault_path / self.template_file_path).with_suffix('.md')
def find_events_list_block(ast) -> tuple[list, list[str], list]:
blocks = ast.children
for block_i, block in enumerate(blocks):
if (
isinstance(block, marko.block.Heading)
and block.children[0].children.lower() == 'events'
):
events_block = ast.children[block_i + 1]
if isinstance(events_block, marko.block.List):
offset = 2
event_texts = [
MARKDOWN_RENDERER.render_children(li).strip()
for li in events_block.children
]
else:
offset = 1
event_texts = []
return (blocks[:block_i], event_texts, blocks[block_i + offset :])
return (blocks, [], [])
def format_event_string(event: Event) -> str:
assert event is not None
if (
event.start_time is None
and event.end_time is None
and event.subject is None
and event.verb is None
):
return event.comment
buf = []
buf.append(f'{event.start_time:%H:%M}')
if event.end_time and event.end_time != event.start_time:
buf.append(f'-{event.end_time:%H:%M}')
buf.append(' | ')
buf.append(event.verb)
buf.append(' [[')
buf.append(event.subject)
buf.append(']]. ')
buf.append(event.comment.strip())
return ''.join(buf)
RE_TIME = r'(\d\d:\d\d(?::\d\d(?:\.\d+?))?)'
RE_VERB = r'(\w+(?:ed|te))'
RE_LINK_MD = r'\[([^\]:/]*)\]\(?:[^)]*\)'
RE_LINK_WIKI = r'\[\[([^\]:/]*)\]\]'
RE_TIME_FORMAT = RE_TIME + r'(?:\s*\-\s*' + RE_TIME + r')?'
def parse_event_string(event_str: str) -> Event:
if m := re.match(
r'^\s*' + RE_TIME_FORMAT + r'[ :\|-]*'+RE_VERB+r'\s+'+RE_LINK_MD+r'\.?\s*(.*)$',
event_str,
):
start = datetime.time.fromisoformat(m.group(1))
end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start
return Event(start, end, m.group(3), m.group(4), m.group(5))
if m := re.match(
r'^\s*' + RE_TIME_FORMAT + r'[ :\|-]*'+RE_VERB+r'\s+'+RE_LINK_WIKI+r'\.?\s*(.*)$',
event_str,
):
start = datetime.time.fromisoformat(m.group(1))
end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start
return Event(start, end, m.group(3), m.group(4), m.group(5))
logger.info('Could not parse format: %s', event_str)
return Event(None, None, None, None, event_str)
def unique(ls: list) -> list:
return list(dict.fromkeys(ls))