1
0

Compare commits

...

3 Commits

Author SHA1 Message Date
5255206cf4
Improved events order
Some checks failed
Run Python tests (through Pytest) / Test (push) Failing after 33s
Verify Python project can be installed, loaded and have version checked / Test (push) Successful in 29s
2024-10-22 00:41:47 +02:00
be0a30298d
Games played import works 2024-10-22 00:04:34 +02:00
a872ed1e85
Improving event import 2024-10-21 23:39:25 +02:00
2 changed files with 88 additions and 37 deletions

View File

@ -3,6 +3,7 @@
Sub-module for importing time-based data into Obsidian.
"""
import dataclasses
import datetime
from logging import getLogger
from pathlib import Path
@ -10,7 +11,9 @@ from typing import Any
from collections.abc import Iterator
from personal_data.csv_import import start_end, determine_possible_keys, load_csv_file
from personal_data.activity import ActivitySample, Label, RealizedActivitySample, heuristically_realize_samples
from personal_data.activity import (ActivitySample, Label,
RealizedActivitySample, heuristically_realize_samples, merge_adjacent_samples
)
from .obsidian import Event, ObsidianVault
@ -103,12 +106,20 @@ def escape_for_obsidian_link(link: str) -> str:
return link.replace(':', ' ').replace('/', ' ').replace(' ', ' ')
@dataclasses.dataclass(frozen=True)
class EventContent:
verb: str
subject: str
comment: str
def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
verb = 'Watched'
def import_activity_sample_csv(vault: ObsidianVault, rows: Rows,
content_mapper, group_category: str | None = None) -> int:
samples = heuristically_realize_samples(list(iterate_samples_from_rows(rows)))
if group_category is not None:
samples = merge_adjacent_samples(list(samples), group_category)
samples_per_date: dict[datetime.date, list[RealizedActivitySample]] = {}
for sample in samples:
date: datetime.date = sample.start_at.date()
@ -118,18 +129,13 @@ def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
del rows
def map_to_event(sample: RealizedActivitySample) -> Event:
noun = escape_for_obsidian_link(sample.single_label_with_category('series.name'))
comment = '{} Episode {}: *{}*'.format(
sample.single_label_with_category('season.name'),
sample.single_label_with_category('episode.index'),
sample.single_label_with_category('episode.name'),
)
content = content_mapper(sample)
expected_tz = datetime.timezone(datetime.timedelta(hours=2)) # TODO: Determine this in a more intelligent manner
return Event(sample.start_at.astimezone(expected_tz).replace(second=0,microsecond=0).time(),
sample.end_at.astimezone(expected_tz).replace(second=0,microsecond=0).time(),
verb,
noun,
comment,
verb=content.verb,
subject=escape_for_obsidian_link(content.subject),
comment=content.comment,
)
num_updated = 0
@ -144,6 +150,44 @@ def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
return num_updated
def import_activity_sample_csv_from_file(vault: ObsidianVault, data_path: Path,
content_mapper, **kwargs) -> int:
rows = load_csv_file(data_path)
logger.info('Loaded CSV with %d lines (%s)', len(rows), data_path)
num_updated = import_activity_sample_csv(vault, rows, content_mapper, **kwargs)
logger.info('Updated %d files', num_updated)
def map_watched_series_content(sample: RealizedActivitySample) -> EventContent:
subject = sample.single_label_with_category('series.name')
comment = '{} Episode {}: *{}*'.format(
sample.single_label_with_category('season.name'),
sample.single_label_with_category('episode.index'),
sample.single_label_with_category('episode.name'),
)
return EventContent(
verb='Watched',
subject=subject,
comment=comment,
)
def map_games_played_content(sample: RealizedActivitySample) -> EventContent:
subject = sample.single_label_with_category('game.name')
comment = ''
return EventContent(
verb='Played',
subject=subject,
comment=comment,
)
def import_watched_series_csv_from_file(vault: ObsidianVault) -> int:
data_path = Path('output/show_episodes_watched.csv')
return import_activity_sample_csv_from_file(vault, data_path, map_watched_series_content)
def import_played_games_csv_from_file(vault: ObsidianVault) -> int:
data_path = Path('output/games_played_playstation.csv')
return import_activity_sample_csv_from_file(vault, data_path,
map_games_played_content,
group_category='game.name')
def import_data(obsidian_path: Path, dry_run=True):
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
@ -164,9 +208,5 @@ def import_data(obsidian_path: Path, dry_run=True):
num_updated = import_step_counts_csv(vault, rows)
logger.info('Updated %d files', num_updated)
if True:
data_path = Path('output/show_episodes_watched.csv')
rows = load_csv_file(data_path)
logger.info('Loaded CSV with %d lines', len(rows))
num_updated = import_watched_series_csv(vault, rows)
logger.info('Updated %d files', num_updated)
import_watched_series_csv_from_file(vault)
import_played_games_csv_from_file(vault)

View File

@ -16,7 +16,7 @@ logger = getLogger(__name__)
StatisticKey = str
@dataclasses.dataclass(frozen=True)
@dataclasses.dataclass(frozen=True, order=True)
class Event:
start_time: datetime.time | None
end_time: datetime.time | None
@ -34,7 +34,7 @@ class Event:
class FileContents:
frontmatter: dict[str, Any]
blocks_pre_events: list
events: list[Event]
events: frozenset[Event]
blocks_post_events: list
@ -48,6 +48,8 @@ FILE_FORMAT = """
{blocks_post_events}
"""
MIDNIGHT = datetime.time(0,0,0)
class ObsidianVault:
def __init__(self, vault_path: Path, read_only: bool = 'silent'):
@ -114,18 +116,27 @@ class ObsidianVault:
date,
events,
)
return False
if not self.read_only:
self._create_date_if_not_present(date)
self._create_date_if_not_present(date)
contents = self._get_date_contents(date)
contents.events.extend(events)
self._save_contents(date, contents)
return True
def get_events(self, date: datetime.date) -> list[Event]:
contents = self._get_date_contents(date)
if contents is None:
return []
return False
# Exit without writing if there were no changes.
updated_events: frozenset[Event] = contents.events | set(events)
if contents.events == updated_events:
return False
contents = dataclasses.replace(contents, events = updated_events)
if not self.read_only:
self._save_contents(date, contents)
return True
def get_events(self, date: datetime.date) -> frozenset[Event]:
contents = self._get_date_contents(date)
if contents is None:
return frozenset()
return contents.events
def _get_date_contents(self, date: datetime.date) -> FileContents | None:
@ -137,7 +148,7 @@ class ObsidianVault:
ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
(pre_events, list_block_items, post_events) = find_events_list_block(ast)
events = [parse_event_string(list_item) for list_item in list_block_items]
events = frozenset(parse_event_string(list_item) for list_item in list_block_items)
return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
def _save_contents(self, date: datetime.date, contents: FileContents) -> None:
@ -148,8 +159,12 @@ class ObsidianVault:
blocks_post_events = ''.join(
MARKDOWN_RENDERER.render(b) for b in contents.blocks_post_events
)
events = list(contents.events)
events.sort()
events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT)
block_events = '\n'.join(
'- ' + format_event_string(e) for e in unique(contents.events)
'- ' + format_event_string(e) for e in events
)
text = FILE_FORMAT.format(
blocks_pre_events=blocks_pre_events,
@ -168,6 +183,7 @@ class ObsidianVault:
logger.info('File "%s" doesn\'t exist, creating...', date)
with open(self._daily_template_path()) as f:
template_text = f.read()
date_file.parent.mkdir(exist_ok=True, parents=True)
with open(date_file, 'w') as f:
f.write(template_text)
@ -223,8 +239,7 @@ def format_event_string(event: Event) -> str:
buf.append(event.verb)
buf.append(' [[')
buf.append(event.subject)
buf.append(']]. ')
buf.append(event.comment.strip())
buf.append((']]. ' + event.comment).strip())
return ''.join(buf)
@ -254,7 +269,3 @@ def parse_event_string(event_str: str) -> Event:
return Event(start, end, m.group(3), m.group(4), m.group(5))
logger.info('Could not parse format: %s', event_str)
return Event(None, None, None, None, event_str)
def unique(ls: list) -> list:
return list(dict.fromkeys(ls))