1
0

Compare commits

..

No commits in common. "30bbab284142aade64432c478e1a4f3aa2f6e0ee" and "57918ec9b9e7876ee2126cff4a15fe6882c6df81" have entirely different histories.

8 changed files with 68 additions and 152 deletions

View File

@ -41,9 +41,7 @@ def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
for event_data in rows: for event_data in rows:
(start_at, end_at) = start_end(event_data, possible_keys) (start_at, end_at) = start_end(event_data, possible_keys)
labels = [ labels = [Label(k, event_data[k]) for k in possible_keys.misc]
Label(k, event_data.get(k)) for k in possible_keys.misc if k in event_data
]
# Create event # Create event
yield ActivitySample( yield ActivitySample(
@ -245,12 +243,3 @@ def import_data(obsidian_path: Path, dry_run=True):
import_watched_series_csv_from_file(vault) import_watched_series_csv_from_file(vault)
import_played_games_csv_from_file(vault) import_played_games_csv_from_file(vault)
num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty])
logger.info('dirty files in cache: %d', num_dirty)
logger.info(
'clean files in cache: %d',
len(vault.internal_file_text_cache) - num_dirty,
)
if not dry_run:
vault.flush_cache()

View File

@ -38,12 +38,6 @@ class FileContents:
blocks_post_events: list blocks_post_events: list
@dataclasses.dataclass(frozen=False)
class CachedFile:
data: bytes
is_dirty: bool
MARKDOWN_PARSER = marko.Markdown() MARKDOWN_PARSER = marko.Markdown()
MARKDOWN_RENDERER = marko.md_renderer.MarkdownRenderer() MARKDOWN_RENDERER = marko.md_renderer.MarkdownRenderer()
@ -58,35 +52,24 @@ MIDNIGHT = datetime.time(0, 0, 0)
class ObsidianVault: class ObsidianVault:
def __init__( def __init__(self, vault_path: Path, read_only: bool = 'silent'):
self,
vault_path: Path,
read_only: bool = 'silent',
allow_invalid_vault=False,
):
self.vault_path = vault_path self.vault_path = vault_path
self.read_only = read_only
self.internal_file_text_cache: dict[Path, CachedFile] = {}
if not allow_invalid_vault:
assert (self.vault_path / '.obsidian').exists(), 'Not an Obsidian Vault' assert (self.vault_path / '.obsidian').exists(), 'Not an Obsidian Vault'
try:
with open(self.vault_path / '.obsidian' / 'daily-notes.json') as f: with open(self.vault_path / '.obsidian' / 'daily-notes.json') as f:
daily_notes_config = json.load(f) daily_notes_config = json.load(f)
self.daily_folder = daily_notes_config['folder'] self.daily_folder = daily_notes_config['folder']
self.path_format = daily_notes_config['format'] self.path_format = daily_notes_config['format']
self.template_file_path = daily_notes_config['template'] self.template_file_path = daily_notes_config['template']
except FileNotFoundError: self.read_only = read_only
if not allow_invalid_vault:
assert False, 'Missing daily notes configuration!'
def get_statistic( def get_statistic(
self, self,
date: datetime.date, date: datetime.date,
statistic_key: StatisticKey, statistic_key: StatisticKey,
) -> Any | None: ) -> Any | None:
if contents := self._load_date_contents(date): if contents := self._get_date_contents(date):
return contents.frontmatter.get(statistic_key) return contents.frontmatter.get(statistic_key)
return None return None
@ -100,8 +83,19 @@ class ObsidianVault:
if isinstance(amount, Decimal): if isinstance(amount, Decimal):
amount = float(amount) amount = float(amount)
# Check for silent
if self.read_only == 'silent':
logger.info(
'Read-only ObsidianVault ignoring add_statistic(%s, "%s", %s)',
date,
statistic_key,
amount,
)
return False
# Load contents # Load contents
contents = self._load_date_contents(date) self._create_date_if_not_present(date)
contents = self._get_date_contents(date)
# Update contents # Update contents
if contents.frontmatter.get(statistic_key) == amount: if contents.frontmatter.get(statistic_key) == amount:
@ -112,11 +106,20 @@ class ObsidianVault:
del contents.frontmatter[statistic_key] del contents.frontmatter[statistic_key]
# Save contents # Save contents
self._save_date_contents(date, contents) self._save_contents(date, contents)
return True return True
def add_events(self, date: datetime.date, events: list[Event]) -> bool: def add_events(self, date: datetime.date, events: list[Event]) -> bool:
contents = self._load_date_contents(date) if self.read_only == 'silent':
logger.info(
'Read-only ObsidianVault ignoring add_event(%s, "%s", ?)',
date,
events,
)
if not self.read_only:
self._create_date_if_not_present(date)
contents = self._get_date_contents(date)
if contents is None: if contents is None:
return False return False
@ -126,23 +129,22 @@ class ObsidianVault:
return False return False
contents = dataclasses.replace(contents, events=updated_events) contents = dataclasses.replace(contents, events=updated_events)
self._save_date_contents(date, contents) if not self.read_only:
self._save_contents(date, contents)
return True return True
def get_events(self, date: datetime.date) -> frozenset[Event]: def get_events(self, date: datetime.date) -> frozenset[Event]:
contents = self._load_date_contents(date) contents = self._get_date_contents(date)
if contents is None: if contents is None:
return frozenset() return frozenset()
return contents.events return contents.events
def _load_date_contents(self, date: datetime.date) -> FileContents | None: def _get_date_contents(self, date: datetime.date) -> FileContents | None:
file_path = self._date_file_path(date) try:
text = self._load_file_text(file_path) or self._load_file_text( with open(self._date_file_path(date)) as f:
self._daily_template_path(), file_frontmatter = frontmatter.load(f)
) except FileNotFoundError:
assert text is not None return None
file_frontmatter = frontmatter.loads(text)
ast = MARKDOWN_PARSER.parse(str(file_frontmatter)) ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
(pre_events, list_block_items, post_events) = find_events_list_block(ast) (pre_events, list_block_items, post_events) = find_events_list_block(ast)
@ -151,7 +153,8 @@ class ObsidianVault:
) )
return FileContents(file_frontmatter.metadata, pre_events, events, post_events) return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
def _save_date_contents(self, date: datetime.date, contents: FileContents) -> None: def _save_contents(self, date: datetime.date, contents: FileContents) -> None:
logger.info('Formatting file "%s"', date)
blocks_pre_events = ''.join( blocks_pre_events = ''.join(
MARKDOWN_RENDERER.render(b) for b in contents.blocks_pre_events MARKDOWN_RENDERER.render(b) for b in contents.blocks_pre_events
) )
@ -160,32 +163,31 @@ class ObsidianVault:
) )
events = list(contents.events) events = list(contents.events)
events.sort(key=lambda x: x.subject or '') events.sort()
events.sort(key=lambda x: x.verb or '')
events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT) events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT)
block_events = '\n'.join('- ' + format_event_string(e) for e in events) block_events = '\n'.join('- ' + format_event_string(e) for e in events)
text = FILE_FORMAT.format(
post = frontmatter.Post(
content=FILE_FORMAT.format(
blocks_pre_events=blocks_pre_events, blocks_pre_events=blocks_pre_events,
blocks_post_events=blocks_post_events, blocks_post_events=blocks_post_events,
block_events=block_events, block_events=block_events,
).strip(), ).strip()
metadata=contents.frontmatter,
)
self._save_file_text_to_cache( logger.info('Saving file "%s"', date)
self._date_file_path(date), with open(self._date_file_path(date), 'wb') as f:
frontmatter.dumps(post).encode('utf8'), frontmatter.dump(frontmatter.Post(text, **contents.frontmatter), f)
)
def _save_file_text_to_cache(self, path: Path, text: bytes) -> None: def _create_date_if_not_present(self, date: datetime.date):
if path not in self.internal_file_text_cache: date_file = self._date_file_path(date)
self.internal_file_text_cache[path] = CachedFile(None, False) if date_file.exists():
self.internal_file_text_cache[path].data = text return
self.internal_file_text_cache[path].is_dirty = True logger.info('File "%s" doesn\'t exist, creating...', date)
with open(self._daily_template_path()) as f:
template_text = f.read()
date_file.parent.mkdir(exist_ok=True, parents=True)
with open(date_file, 'w') as f:
f.write(template_text)
def _date_file_path(self, date: datetime.date) -> Path: def _date_file_path(self, date: datetime.date):
path = ( path = (
self.path_format.replace('YYYY', str(date.year)) self.path_format.replace('YYYY', str(date.year))
.replace('MM', f'{date.month:02d}') .replace('MM', f'{date.month:02d}')
@ -193,30 +195,9 @@ class ObsidianVault:
) )
return (self.vault_path / self.daily_folder / path).with_suffix('.md') return (self.vault_path / self.daily_folder / path).with_suffix('.md')
def _daily_template_path(self) -> Path: def _daily_template_path(self):
return (self.vault_path / self.template_file_path).with_suffix('.md') return (self.vault_path / self.template_file_path).with_suffix('.md')
def _load_file_text(self, path: Path) -> bytes | None:
if path not in self.internal_file_text_cache:
try:
with open(path, 'rb') as f:
self.internal_file_text_cache[path] = CachedFile(f.read(), False)
except FileNotFoundError:
return None
return self.internal_file_text_cache[path].data
def flush_cache(self) -> None:
if self.read_only:
msg = 'Read-only ObsidianVault cannot be flushed'
raise RuntimeError(msg)
for path, cached_file in self.internal_file_text_cache.items():
if cached_file.is_dirty:
logger.info('Saving file "%s"', path)
path.parent.mkdir(exist_ok=True, parents=True)
with open(path, 'wb') as f:
f.write(cached_file.data)
del path, cached_file
def find_events_list_block(ast) -> tuple[list, list[str], list]: def find_events_list_block(ast) -> tuple[list, list[str], list]:
blocks = ast.children blocks = ast.children

View File

@ -21,7 +21,6 @@ def get_modules(backend_dir: Path) -> Iterator[str]:
if name != '__init__': if name != '__init__':
yield name yield name
FETCHER_MODULES_LOADED = False FETCHER_MODULES_LOADED = False

View File

@ -7,7 +7,7 @@ from pathlib import Path
import requests import requests
import requests_cache import requests_cache
from . import data, fetchers, notification, util from . import data, notification, util, fetchers
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -1,6 +0,0 @@
---
aliases:
- My day
---
# My day

View File

@ -1,4 +1,3 @@
def test_init(): def test_init():
import personal_data import personal_data
assert personal_data.__version__ is not None assert personal_data.__version__ is not None

View File

@ -1,46 +0,0 @@
import datetime
from pathlib import Path
from obsidian_import import obsidian
EXAMPLES = [
obsidian.Event(
datetime.time(12, 0, 0),
datetime.time(12, 0, 0),
'Ate',
'Lunch',
'instantly',
),
obsidian.Event(
datetime.time(20, 0, 0),
datetime.time(22, 0, 0),
'Watched',
'Tom and Jerry',
'on the *Television*',
),
obsidian.Event(None, None, None, None, 'Took a walk'),
obsidian.Event(None, None, None, None, 'Watched [[Cyberpunk: Edgerunners]].'),
]
def test_write_internally():
vault = obsidian.ObsidianVault(
Path('test'),
read_only=True,
allow_invalid_vault=True,
)
vault.daily_folder = Path('daily')
vault.path_format = 'YYYY-MM-DD'
vault.template_file_path = Path('daily-template-file.md')
vault.add_events(datetime.date(2020, 1, 1), EXAMPLES)
assert len(vault.internal_file_text_cache) == 2
assert vault.internal_file_text_cache[
Path('test/daily-template-file.md')
].data.startswith(b'---\n')
expected_path = Path('test/daily/2020-01-01.md')
assert expected_path in vault.internal_file_text_cache
assert vault.internal_file_text_cache[expected_path].data.startswith(b'---\n')