1
0
personal-data/obsidian_import/obsidian.py
Jon Michael Aanes f82b7c8526
Some checks failed
Run Python tests (through Pytest) / Test (push) Failing after 34s
Verify Python project can be installed, loaded and have version checked / Test (push) Successful in 29s
Ruff
2024-10-23 21:30:23 +02:00

284 lines
8.6 KiB
Python

import dataclasses
import datetime
import json
import re
from decimal import Decimal
from logging import getLogger
from pathlib import Path
from typing import Any
import frontmatter
import marko
import marko.md_renderer
logger = getLogger(__name__)
StatisticKey = str
@dataclasses.dataclass(frozen=True, order=True)
class Event:
start_time: datetime.time | None
end_time: datetime.time | None
verb: str | None
subject: str | None
comment: str
def __post_init__(self):
if self.subject:
assert ':' not in self.subject
assert '/' not in self.subject
@dataclasses.dataclass(frozen=True)
class FileContents:
frontmatter: dict[str, Any]
blocks_pre_events: list
events: frozenset[Event]
blocks_post_events: list
MARKDOWN_PARSER = marko.Markdown()
MARKDOWN_RENDERER = marko.md_renderer.MarkdownRenderer()
FILE_FORMAT = """
{blocks_pre_events}
## Events
{block_events}
{blocks_post_events}
"""
MIDNIGHT = datetime.time(0, 0, 0)
class ObsidianVault:
def __init__(self, vault_path: Path, read_only: bool = 'silent'):
self.vault_path = vault_path
assert (self.vault_path / '.obsidian').exists(), 'Not an Obsidian Vault'
with open(self.vault_path / '.obsidian' / 'daily-notes.json') as f:
daily_notes_config = json.load(f)
self.daily_folder = daily_notes_config['folder']
self.path_format = daily_notes_config['format']
self.template_file_path = daily_notes_config['template']
self.read_only = read_only
def get_statistic(
self,
date: datetime.date,
statistic_key: StatisticKey,
) -> Any | None:
if contents := self._get_date_contents(date):
return contents.frontmatter.get(statistic_key)
return None
def add_statistic(
self,
date: datetime.date,
statistic_key: StatisticKey,
amount: Any,
) -> bool:
# Adjust arguments
if isinstance(amount, Decimal):
amount = float(amount)
# Check for silent
if self.read_only == 'silent':
logger.info(
'Read-only ObsidianVault ignoring add_statistic(%s, "%s", %s)',
date,
statistic_key,
amount,
)
return False
# Load contents
self._create_date_if_not_present(date)
contents = self._get_date_contents(date)
# Update contents
if contents.frontmatter.get(statistic_key) == amount:
return False
contents.frontmatter[statistic_key] = amount
if amount is None:
del contents.frontmatter[statistic_key]
# Save contents
self._save_contents(date, contents)
return True
def add_events(self, date: datetime.date, events: list[Event]) -> bool:
if self.read_only == 'silent':
logger.info(
'Read-only ObsidianVault ignoring add_event(%s, "%s", ?)',
date,
events,
)
if not self.read_only:
self._create_date_if_not_present(date)
contents = self._get_date_contents(date)
if contents is None:
return False
# Exit without writing if there were no changes.
updated_events: frozenset[Event] = contents.events | set(events)
if contents.events == updated_events:
return False
contents = dataclasses.replace(contents, events=updated_events)
if not self.read_only:
self._save_contents(date, contents)
return True
def get_events(self, date: datetime.date) -> frozenset[Event]:
contents = self._get_date_contents(date)
if contents is None:
return frozenset()
return contents.events
def _get_date_contents(self, date: datetime.date) -> FileContents | None:
try:
with open(self._date_file_path(date)) as f:
file_frontmatter = frontmatter.load(f)
except FileNotFoundError:
return None
ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
(pre_events, list_block_items, post_events) = find_events_list_block(ast)
events = frozenset(
parse_event_string(list_item) for list_item in list_block_items
)
return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
def _save_contents(self, date: datetime.date, contents: FileContents) -> None:
logger.info('Formatting file "%s"', date)
blocks_pre_events = ''.join(
MARKDOWN_RENDERER.render(b) for b in contents.blocks_pre_events
)
blocks_post_events = ''.join(
MARKDOWN_RENDERER.render(b) for b in contents.blocks_post_events
)
events = list(contents.events)
events.sort()
events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT)
block_events = '\n'.join('- ' + format_event_string(e) for e in events)
text = FILE_FORMAT.format(
blocks_pre_events=blocks_pre_events,
blocks_post_events=blocks_post_events,
block_events=block_events,
).strip()
logger.info('Saving file "%s"', date)
with open(self._date_file_path(date), 'wb') as f:
frontmatter.dump(frontmatter.Post(text, **contents.frontmatter), f)
def _create_date_if_not_present(self, date: datetime.date):
date_file = self._date_file_path(date)
if date_file.exists():
return
logger.info('File "%s" doesn\'t exist, creating...', date)
with open(self._daily_template_path()) as f:
template_text = f.read()
date_file.parent.mkdir(exist_ok=True, parents=True)
with open(date_file, 'w') as f:
f.write(template_text)
def _date_file_path(self, date: datetime.date):
path = (
self.path_format.replace('YYYY', str(date.year))
.replace('MM', f'{date.month:02d}')
.replace('DD', f'{date.day:02d}')
)
return (self.vault_path / self.daily_folder / path).with_suffix('.md')
def _daily_template_path(self):
return (self.vault_path / self.template_file_path).with_suffix('.md')
def find_events_list_block(ast) -> tuple[list, list[str], list]:
blocks = ast.children
for block_i, block in enumerate(blocks):
if (
isinstance(block, marko.block.Heading)
and block.children[0].children.lower() == 'events'
):
events_block = ast.children[block_i + 1]
if isinstance(events_block, marko.block.List):
offset = 2
event_texts = [
MARKDOWN_RENDERER.render_children(li).strip()
for li in events_block.children
]
else:
offset = 1
event_texts = []
return (blocks[:block_i], event_texts, blocks[block_i + offset :])
return (blocks, [], [])
def format_event_string(event: Event) -> str:
assert event is not None
if (
event.start_time is None
and event.end_time is None
and event.subject is None
and event.verb is None
):
return event.comment
buf = []
buf.append(f'{event.start_time:%H:%M}')
if event.end_time and event.end_time != event.start_time:
buf.append(f'-{event.end_time:%H:%M}')
buf.append(' | ')
buf.append(event.verb)
buf.append(' [[')
buf.append(event.subject)
buf.append((']]. ' + event.comment).strip())
return ''.join(buf)
RE_TIME = r'(\d\d:\d\d(?::\d\d(?:\.\d+?))?)'
RE_VERB = r'(\w+(?:ed|te))'
RE_LINK_MD = r'\[([^\]:/]*)\]\(?:[^)]*\)'
RE_LINK_WIKI = r'\[\[([^\]:/]*)\]\]'
RE_TIME_FORMAT = RE_TIME + r'(?:\s*\-\s*' + RE_TIME + r')?'
def parse_event_string(event_str: str) -> Event:
if m := re.match(
r'^\s*'
+ RE_TIME_FORMAT
+ r'[ :\|-]*'
+ RE_VERB
+ r'\s+'
+ RE_LINK_MD
+ r'\.?\s*(.*)$',
event_str,
):
start = datetime.time.fromisoformat(m.group(1))
end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start
return Event(start, end, m.group(3), m.group(4), m.group(5))
if m := re.match(
r'^\s*'
+ RE_TIME_FORMAT
+ r'[ :\|-]*'
+ RE_VERB
+ r'\s+'
+ RE_LINK_WIKI
+ r'\.?\s*(.*)$',
event_str,
):
start = datetime.time.fromisoformat(m.group(1))
end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start
return Event(start, end, m.group(3), m.group(4), m.group(5))
logger.info('Could not parse format: %s', event_str)
return Event(None, None, None, None, event_str)