Compare commits
3 Commits
57918ec9b9
...
30bbab2841
Author | SHA1 | Date | |
---|---|---|---|
30bbab2841 | |||
6d91e54ced | |||
43627f2aa7 |
|
@ -41,7 +41,9 @@ def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
|
||||||
|
|
||||||
for event_data in rows:
|
for event_data in rows:
|
||||||
(start_at, end_at) = start_end(event_data, possible_keys)
|
(start_at, end_at) = start_end(event_data, possible_keys)
|
||||||
labels = [Label(k, event_data[k]) for k in possible_keys.misc]
|
labels = [
|
||||||
|
Label(k, event_data.get(k)) for k in possible_keys.misc if k in event_data
|
||||||
|
]
|
||||||
|
|
||||||
# Create event
|
# Create event
|
||||||
yield ActivitySample(
|
yield ActivitySample(
|
||||||
|
@ -243,3 +245,12 @@ def import_data(obsidian_path: Path, dry_run=True):
|
||||||
|
|
||||||
import_watched_series_csv_from_file(vault)
|
import_watched_series_csv_from_file(vault)
|
||||||
import_played_games_csv_from_file(vault)
|
import_played_games_csv_from_file(vault)
|
||||||
|
|
||||||
|
num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty])
|
||||||
|
logger.info('dirty files in cache: %d', num_dirty)
|
||||||
|
logger.info(
|
||||||
|
'clean files in cache: %d',
|
||||||
|
len(vault.internal_file_text_cache) - num_dirty,
|
||||||
|
)
|
||||||
|
if not dry_run:
|
||||||
|
vault.flush_cache()
|
||||||
|
|
|
@ -38,6 +38,12 @@ class FileContents:
|
||||||
blocks_post_events: list
|
blocks_post_events: list
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(frozen=False)
|
||||||
|
class CachedFile:
|
||||||
|
data: bytes
|
||||||
|
is_dirty: bool
|
||||||
|
|
||||||
|
|
||||||
MARKDOWN_PARSER = marko.Markdown()
|
MARKDOWN_PARSER = marko.Markdown()
|
||||||
MARKDOWN_RENDERER = marko.md_renderer.MarkdownRenderer()
|
MARKDOWN_RENDERER = marko.md_renderer.MarkdownRenderer()
|
||||||
|
|
||||||
|
@ -52,24 +58,35 @@ MIDNIGHT = datetime.time(0, 0, 0)
|
||||||
|
|
||||||
|
|
||||||
class ObsidianVault:
|
class ObsidianVault:
|
||||||
def __init__(self, vault_path: Path, read_only: bool = 'silent'):
|
def __init__(
|
||||||
|
self,
|
||||||
|
vault_path: Path,
|
||||||
|
read_only: bool = 'silent',
|
||||||
|
allow_invalid_vault=False,
|
||||||
|
):
|
||||||
self.vault_path = vault_path
|
self.vault_path = vault_path
|
||||||
|
self.read_only = read_only
|
||||||
|
self.internal_file_text_cache: dict[Path, CachedFile] = {}
|
||||||
|
|
||||||
|
if not allow_invalid_vault:
|
||||||
assert (self.vault_path / '.obsidian').exists(), 'Not an Obsidian Vault'
|
assert (self.vault_path / '.obsidian').exists(), 'Not an Obsidian Vault'
|
||||||
|
|
||||||
|
try:
|
||||||
with open(self.vault_path / '.obsidian' / 'daily-notes.json') as f:
|
with open(self.vault_path / '.obsidian' / 'daily-notes.json') as f:
|
||||||
daily_notes_config = json.load(f)
|
daily_notes_config = json.load(f)
|
||||||
self.daily_folder = daily_notes_config['folder']
|
self.daily_folder = daily_notes_config['folder']
|
||||||
self.path_format = daily_notes_config['format']
|
self.path_format = daily_notes_config['format']
|
||||||
self.template_file_path = daily_notes_config['template']
|
self.template_file_path = daily_notes_config['template']
|
||||||
self.read_only = read_only
|
except FileNotFoundError:
|
||||||
|
if not allow_invalid_vault:
|
||||||
|
assert False, 'Missing daily notes configuration!'
|
||||||
|
|
||||||
def get_statistic(
|
def get_statistic(
|
||||||
self,
|
self,
|
||||||
date: datetime.date,
|
date: datetime.date,
|
||||||
statistic_key: StatisticKey,
|
statistic_key: StatisticKey,
|
||||||
) -> Any | None:
|
) -> Any | None:
|
||||||
if contents := self._get_date_contents(date):
|
if contents := self._load_date_contents(date):
|
||||||
return contents.frontmatter.get(statistic_key)
|
return contents.frontmatter.get(statistic_key)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -83,19 +100,8 @@ class ObsidianVault:
|
||||||
if isinstance(amount, Decimal):
|
if isinstance(amount, Decimal):
|
||||||
amount = float(amount)
|
amount = float(amount)
|
||||||
|
|
||||||
# Check for silent
|
|
||||||
if self.read_only == 'silent':
|
|
||||||
logger.info(
|
|
||||||
'Read-only ObsidianVault ignoring add_statistic(%s, "%s", %s)',
|
|
||||||
date,
|
|
||||||
statistic_key,
|
|
||||||
amount,
|
|
||||||
)
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Load contents
|
# Load contents
|
||||||
self._create_date_if_not_present(date)
|
contents = self._load_date_contents(date)
|
||||||
contents = self._get_date_contents(date)
|
|
||||||
|
|
||||||
# Update contents
|
# Update contents
|
||||||
if contents.frontmatter.get(statistic_key) == amount:
|
if contents.frontmatter.get(statistic_key) == amount:
|
||||||
|
@ -106,20 +112,11 @@ class ObsidianVault:
|
||||||
del contents.frontmatter[statistic_key]
|
del contents.frontmatter[statistic_key]
|
||||||
|
|
||||||
# Save contents
|
# Save contents
|
||||||
self._save_contents(date, contents)
|
self._save_date_contents(date, contents)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def add_events(self, date: datetime.date, events: list[Event]) -> bool:
|
def add_events(self, date: datetime.date, events: list[Event]) -> bool:
|
||||||
if self.read_only == 'silent':
|
contents = self._load_date_contents(date)
|
||||||
logger.info(
|
|
||||||
'Read-only ObsidianVault ignoring add_event(%s, "%s", ?)',
|
|
||||||
date,
|
|
||||||
events,
|
|
||||||
)
|
|
||||||
if not self.read_only:
|
|
||||||
self._create_date_if_not_present(date)
|
|
||||||
|
|
||||||
contents = self._get_date_contents(date)
|
|
||||||
if contents is None:
|
if contents is None:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -129,22 +126,23 @@ class ObsidianVault:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
contents = dataclasses.replace(contents, events=updated_events)
|
contents = dataclasses.replace(contents, events=updated_events)
|
||||||
if not self.read_only:
|
self._save_date_contents(date, contents)
|
||||||
self._save_contents(date, contents)
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def get_events(self, date: datetime.date) -> frozenset[Event]:
|
def get_events(self, date: datetime.date) -> frozenset[Event]:
|
||||||
contents = self._get_date_contents(date)
|
contents = self._load_date_contents(date)
|
||||||
if contents is None:
|
if contents is None:
|
||||||
return frozenset()
|
return frozenset()
|
||||||
return contents.events
|
return contents.events
|
||||||
|
|
||||||
def _get_date_contents(self, date: datetime.date) -> FileContents | None:
|
def _load_date_contents(self, date: datetime.date) -> FileContents | None:
|
||||||
try:
|
file_path = self._date_file_path(date)
|
||||||
with open(self._date_file_path(date)) as f:
|
text = self._load_file_text(file_path) or self._load_file_text(
|
||||||
file_frontmatter = frontmatter.load(f)
|
self._daily_template_path(),
|
||||||
except FileNotFoundError:
|
)
|
||||||
return None
|
assert text is not None
|
||||||
|
|
||||||
|
file_frontmatter = frontmatter.loads(text)
|
||||||
|
|
||||||
ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
|
ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
|
||||||
(pre_events, list_block_items, post_events) = find_events_list_block(ast)
|
(pre_events, list_block_items, post_events) = find_events_list_block(ast)
|
||||||
|
@ -153,8 +151,7 @@ class ObsidianVault:
|
||||||
)
|
)
|
||||||
return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
|
return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
|
||||||
|
|
||||||
def _save_contents(self, date: datetime.date, contents: FileContents) -> None:
|
def _save_date_contents(self, date: datetime.date, contents: FileContents) -> None:
|
||||||
logger.info('Formatting file "%s"', date)
|
|
||||||
blocks_pre_events = ''.join(
|
blocks_pre_events = ''.join(
|
||||||
MARKDOWN_RENDERER.render(b) for b in contents.blocks_pre_events
|
MARKDOWN_RENDERER.render(b) for b in contents.blocks_pre_events
|
||||||
)
|
)
|
||||||
|
@ -163,31 +160,32 @@ class ObsidianVault:
|
||||||
)
|
)
|
||||||
|
|
||||||
events = list(contents.events)
|
events = list(contents.events)
|
||||||
events.sort()
|
events.sort(key=lambda x: x.subject or '')
|
||||||
|
events.sort(key=lambda x: x.verb or '')
|
||||||
events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT)
|
events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT)
|
||||||
block_events = '\n'.join('- ' + format_event_string(e) for e in events)
|
block_events = '\n'.join('- ' + format_event_string(e) for e in events)
|
||||||
text = FILE_FORMAT.format(
|
|
||||||
|
post = frontmatter.Post(
|
||||||
|
content=FILE_FORMAT.format(
|
||||||
blocks_pre_events=blocks_pre_events,
|
blocks_pre_events=blocks_pre_events,
|
||||||
blocks_post_events=blocks_post_events,
|
blocks_post_events=blocks_post_events,
|
||||||
block_events=block_events,
|
block_events=block_events,
|
||||||
).strip()
|
).strip(),
|
||||||
|
metadata=contents.frontmatter,
|
||||||
|
)
|
||||||
|
|
||||||
logger.info('Saving file "%s"', date)
|
self._save_file_text_to_cache(
|
||||||
with open(self._date_file_path(date), 'wb') as f:
|
self._date_file_path(date),
|
||||||
frontmatter.dump(frontmatter.Post(text, **contents.frontmatter), f)
|
frontmatter.dumps(post).encode('utf8'),
|
||||||
|
)
|
||||||
|
|
||||||
def _create_date_if_not_present(self, date: datetime.date):
|
def _save_file_text_to_cache(self, path: Path, text: bytes) -> None:
|
||||||
date_file = self._date_file_path(date)
|
if path not in self.internal_file_text_cache:
|
||||||
if date_file.exists():
|
self.internal_file_text_cache[path] = CachedFile(None, False)
|
||||||
return
|
self.internal_file_text_cache[path].data = text
|
||||||
logger.info('File "%s" doesn\'t exist, creating...', date)
|
self.internal_file_text_cache[path].is_dirty = True
|
||||||
with open(self._daily_template_path()) as f:
|
|
||||||
template_text = f.read()
|
|
||||||
date_file.parent.mkdir(exist_ok=True, parents=True)
|
|
||||||
with open(date_file, 'w') as f:
|
|
||||||
f.write(template_text)
|
|
||||||
|
|
||||||
def _date_file_path(self, date: datetime.date):
|
def _date_file_path(self, date: datetime.date) -> Path:
|
||||||
path = (
|
path = (
|
||||||
self.path_format.replace('YYYY', str(date.year))
|
self.path_format.replace('YYYY', str(date.year))
|
||||||
.replace('MM', f'{date.month:02d}')
|
.replace('MM', f'{date.month:02d}')
|
||||||
|
@ -195,9 +193,30 @@ class ObsidianVault:
|
||||||
)
|
)
|
||||||
return (self.vault_path / self.daily_folder / path).with_suffix('.md')
|
return (self.vault_path / self.daily_folder / path).with_suffix('.md')
|
||||||
|
|
||||||
def _daily_template_path(self):
|
def _daily_template_path(self) -> Path:
|
||||||
return (self.vault_path / self.template_file_path).with_suffix('.md')
|
return (self.vault_path / self.template_file_path).with_suffix('.md')
|
||||||
|
|
||||||
|
def _load_file_text(self, path: Path) -> bytes | None:
|
||||||
|
if path not in self.internal_file_text_cache:
|
||||||
|
try:
|
||||||
|
with open(path, 'rb') as f:
|
||||||
|
self.internal_file_text_cache[path] = CachedFile(f.read(), False)
|
||||||
|
except FileNotFoundError:
|
||||||
|
return None
|
||||||
|
return self.internal_file_text_cache[path].data
|
||||||
|
|
||||||
|
def flush_cache(self) -> None:
|
||||||
|
if self.read_only:
|
||||||
|
msg = 'Read-only ObsidianVault cannot be flushed'
|
||||||
|
raise RuntimeError(msg)
|
||||||
|
for path, cached_file in self.internal_file_text_cache.items():
|
||||||
|
if cached_file.is_dirty:
|
||||||
|
logger.info('Saving file "%s"', path)
|
||||||
|
path.parent.mkdir(exist_ok=True, parents=True)
|
||||||
|
with open(path, 'wb') as f:
|
||||||
|
f.write(cached_file.data)
|
||||||
|
del path, cached_file
|
||||||
|
|
||||||
|
|
||||||
def find_events_list_block(ast) -> tuple[list, list[str], list]:
|
def find_events_list_block(ast) -> tuple[list, list[str], list]:
|
||||||
blocks = ast.children
|
blocks = ast.children
|
||||||
|
|
|
@ -21,6 +21,7 @@ def get_modules(backend_dir: Path) -> Iterator[str]:
|
||||||
if name != '__init__':
|
if name != '__init__':
|
||||||
yield name
|
yield name
|
||||||
|
|
||||||
|
|
||||||
FETCHER_MODULES_LOADED = False
|
FETCHER_MODULES_LOADED = False
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,7 @@ from pathlib import Path
|
||||||
import requests
|
import requests
|
||||||
import requests_cache
|
import requests_cache
|
||||||
|
|
||||||
from . import data, notification, util, fetchers
|
from . import data, fetchers, notification, util
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
6
test/daily-template-file.md
Normal file
6
test/daily-template-file.md
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
---
|
||||||
|
aliases:
|
||||||
|
- My day
|
||||||
|
---
|
||||||
|
|
||||||
|
# My day
|
|
@ -1,3 +1,4 @@
|
||||||
def test_init():
|
def test_init():
|
||||||
import personal_data
|
import personal_data
|
||||||
|
|
||||||
assert personal_data.__version__ is not None
|
assert personal_data.__version__ is not None
|
||||||
|
|
46
test/test_obsidian_vault.py
Normal file
46
test/test_obsidian_vault.py
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from obsidian_import import obsidian
|
||||||
|
|
||||||
|
EXAMPLES = [
|
||||||
|
obsidian.Event(
|
||||||
|
datetime.time(12, 0, 0),
|
||||||
|
datetime.time(12, 0, 0),
|
||||||
|
'Ate',
|
||||||
|
'Lunch',
|
||||||
|
'instantly',
|
||||||
|
),
|
||||||
|
obsidian.Event(
|
||||||
|
datetime.time(20, 0, 0),
|
||||||
|
datetime.time(22, 0, 0),
|
||||||
|
'Watched',
|
||||||
|
'Tom and Jerry',
|
||||||
|
'on the *Television*',
|
||||||
|
),
|
||||||
|
obsidian.Event(None, None, None, None, 'Took a walk'),
|
||||||
|
obsidian.Event(None, None, None, None, 'Watched [[Cyberpunk: Edgerunners]].'),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_internally():
|
||||||
|
vault = obsidian.ObsidianVault(
|
||||||
|
Path('test'),
|
||||||
|
read_only=True,
|
||||||
|
allow_invalid_vault=True,
|
||||||
|
)
|
||||||
|
vault.daily_folder = Path('daily')
|
||||||
|
vault.path_format = 'YYYY-MM-DD'
|
||||||
|
vault.template_file_path = Path('daily-template-file.md')
|
||||||
|
|
||||||
|
vault.add_events(datetime.date(2020, 1, 1), EXAMPLES)
|
||||||
|
assert len(vault.internal_file_text_cache) == 2
|
||||||
|
|
||||||
|
assert vault.internal_file_text_cache[
|
||||||
|
Path('test/daily-template-file.md')
|
||||||
|
].data.startswith(b'---\n')
|
||||||
|
|
||||||
|
expected_path = Path('test/daily/2020-01-01.md')
|
||||||
|
assert expected_path in vault.internal_file_text_cache
|
||||||
|
|
||||||
|
assert vault.internal_file_text_cache[expected_path].data.startswith(b'---\n')
|
Loading…
Reference in New Issue
Block a user