Improving event import
This commit is contained in:
parent
141ca7c623
commit
a872ed1e85
|
@ -3,6 +3,7 @@
|
||||||
Sub-module for importing time-based data into Obsidian.
|
Sub-module for importing time-based data into Obsidian.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import dataclasses
|
||||||
import datetime
|
import datetime
|
||||||
from logging import getLogger
|
from logging import getLogger
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
@ -103,10 +104,14 @@ def escape_for_obsidian_link(link: str) -> str:
|
||||||
return link.replace(':', ' ').replace('/', ' ').replace(' ', ' ')
|
return link.replace(':', ' ').replace('/', ' ').replace(' ', ' ')
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(frozen=True)
|
||||||
|
class EventContent:
|
||||||
|
verb: str
|
||||||
|
subject: str
|
||||||
|
comment: str
|
||||||
|
|
||||||
def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
|
|
||||||
verb = 'Watched'
|
|
||||||
|
|
||||||
|
def import_activity_sample_csv(vault: ObsidianVault, rows: Rows, content_mapper) -> int:
|
||||||
samples = heuristically_realize_samples(list(iterate_samples_from_rows(rows)))
|
samples = heuristically_realize_samples(list(iterate_samples_from_rows(rows)))
|
||||||
|
|
||||||
samples_per_date: dict[datetime.date, list[RealizedActivitySample]] = {}
|
samples_per_date: dict[datetime.date, list[RealizedActivitySample]] = {}
|
||||||
|
@ -118,18 +123,13 @@ def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
|
||||||
del rows
|
del rows
|
||||||
|
|
||||||
def map_to_event(sample: RealizedActivitySample) -> Event:
|
def map_to_event(sample: RealizedActivitySample) -> Event:
|
||||||
noun = escape_for_obsidian_link(sample.single_label_with_category('series.name'))
|
content = content_mapper(sample)
|
||||||
comment = '{} Episode {}: *{}*'.format(
|
|
||||||
sample.single_label_with_category('season.name'),
|
|
||||||
sample.single_label_with_category('episode.index'),
|
|
||||||
sample.single_label_with_category('episode.name'),
|
|
||||||
)
|
|
||||||
expected_tz = datetime.timezone(datetime.timedelta(hours=2)) # TODO: Determine this in a more intelligent manner
|
expected_tz = datetime.timezone(datetime.timedelta(hours=2)) # TODO: Determine this in a more intelligent manner
|
||||||
return Event(sample.start_at.astimezone(expected_tz).replace(second=0,microsecond=0).time(),
|
return Event(sample.start_at.astimezone(expected_tz).replace(second=0,microsecond=0).time(),
|
||||||
sample.end_at.astimezone(expected_tz).replace(second=0,microsecond=0).time(),
|
sample.end_at.astimezone(expected_tz).replace(second=0,microsecond=0).time(),
|
||||||
verb,
|
verb=content.verb,
|
||||||
noun,
|
subject=escape_for_obsidian_link(content.subject),
|
||||||
comment,
|
comment=content.comment,
|
||||||
)
|
)
|
||||||
|
|
||||||
num_updated = 0
|
num_updated = 0
|
||||||
|
@ -144,6 +144,43 @@ def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
|
||||||
|
|
||||||
return num_updated
|
return num_updated
|
||||||
|
|
||||||
|
def import_activity_sample_csv_from_file(vault: ObsidianVault, data_path: Path, content_mapper) -> int:
|
||||||
|
rows = load_csv_file(data_path)
|
||||||
|
logger.info('Loaded CSV with %d lines (%s)', len(rows), data_path)
|
||||||
|
num_updated = import_activity_sample_csv(vault, rows, content_mapper)
|
||||||
|
logger.info('Updated %d files', num_updated)
|
||||||
|
|
||||||
|
def map_watched_series_content(sample: RealizedActivitySample) -> EventContent:
|
||||||
|
subject = sample.single_label_with_category('series.name')
|
||||||
|
comment = '{} Episode {}: *{}*'.format(
|
||||||
|
sample.single_label_with_category('season.name'),
|
||||||
|
sample.single_label_with_category('episode.index'),
|
||||||
|
sample.single_label_with_category('episode.name'),
|
||||||
|
)
|
||||||
|
return EventContent(
|
||||||
|
verb='Watched',
|
||||||
|
subject=subject,
|
||||||
|
comment=comment,
|
||||||
|
)
|
||||||
|
|
||||||
|
def map_games_played_content(sample: RealizedActivitySample) -> EventContent:
|
||||||
|
subject = sample.single_label_with_category('game.name')
|
||||||
|
comment = '![]({})'.format(
|
||||||
|
sample.single_label_with_category('trophy.icon')
|
||||||
|
)
|
||||||
|
return EventContent(
|
||||||
|
verb='Played',
|
||||||
|
subject=subject,
|
||||||
|
comment=comment,
|
||||||
|
)
|
||||||
|
|
||||||
|
def import_watched_series_csv_from_file(vault: ObsidianVault) -> int:
|
||||||
|
data_path = Path('output/show_episodes_watched.csv')
|
||||||
|
return import_activity_sample_csv_from_file(vault, data_path, map_watched_series_content)
|
||||||
|
|
||||||
|
def import_played_games_csv_from_file(vault: ObsidianVault) -> int:
|
||||||
|
data_path = Path('output/games_played_playstation.csv')
|
||||||
|
return import_activity_sample_csv_from_file(vault, data_path, map_games_played_content)
|
||||||
|
|
||||||
def import_data(obsidian_path: Path, dry_run=True):
|
def import_data(obsidian_path: Path, dry_run=True):
|
||||||
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
|
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
|
||||||
|
@ -164,9 +201,5 @@ def import_data(obsidian_path: Path, dry_run=True):
|
||||||
num_updated = import_step_counts_csv(vault, rows)
|
num_updated = import_step_counts_csv(vault, rows)
|
||||||
logger.info('Updated %d files', num_updated)
|
logger.info('Updated %d files', num_updated)
|
||||||
|
|
||||||
if True:
|
import_watched_series_csv_from_file(vault)
|
||||||
data_path = Path('output/show_episodes_watched.csv')
|
import_played_games_csv_from_file(vault)
|
||||||
rows = load_csv_file(data_path)
|
|
||||||
logger.info('Loaded CSV with %d lines', len(rows))
|
|
||||||
num_updated = import_watched_series_csv(vault, rows)
|
|
||||||
logger.info('Updated %d files', num_updated)
|
|
||||||
|
|
|
@ -34,7 +34,7 @@ class Event:
|
||||||
class FileContents:
|
class FileContents:
|
||||||
frontmatter: dict[str, Any]
|
frontmatter: dict[str, Any]
|
||||||
blocks_pre_events: list
|
blocks_pre_events: list
|
||||||
events: list[Event]
|
events: frozenset[Event]
|
||||||
blocks_post_events: list
|
blocks_post_events: list
|
||||||
|
|
||||||
|
|
||||||
|
@ -108,24 +108,33 @@ class ObsidianVault:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def add_events(self, date: datetime.date, events: list[Event]) -> bool:
|
def add_events(self, date: datetime.date, events: list[Event]) -> bool:
|
||||||
|
if not self.read_only:
|
||||||
|
self._create_date_if_not_present(date)
|
||||||
if self.read_only == 'silent':
|
if self.read_only == 'silent':
|
||||||
logger.info(
|
logger.info(
|
||||||
'Read-only ObsidianVault ignoring add_event(%s, "%s", ?)',
|
'Read-only ObsidianVault ignoring add_event(%s, "%s", ?)',
|
||||||
date,
|
date,
|
||||||
events,
|
events,
|
||||||
)
|
)
|
||||||
return False
|
|
||||||
|
|
||||||
self._create_date_if_not_present(date)
|
|
||||||
contents = self._get_date_contents(date)
|
|
||||||
contents.events.extend(events)
|
|
||||||
self._save_contents(date, contents)
|
|
||||||
return True
|
|
||||||
|
|
||||||
def get_events(self, date: datetime.date) -> list[Event]:
|
|
||||||
contents = self._get_date_contents(date)
|
contents = self._get_date_contents(date)
|
||||||
if contents is None:
|
if contents is None:
|
||||||
return []
|
return False
|
||||||
|
|
||||||
|
# Exit without writing if there were no changes.
|
||||||
|
updated_events: frozenset[Event] = contents.events | set(events)
|
||||||
|
if contents.events == updated_events:
|
||||||
|
return False
|
||||||
|
|
||||||
|
contents = dataclasses.replace(contents, events = updated_events)
|
||||||
|
if not self.read_only:
|
||||||
|
self._save_contents(date, contents)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def get_events(self, date: datetime.date) -> frozenset[Event]:
|
||||||
|
contents = self._get_date_contents(date)
|
||||||
|
if contents is None:
|
||||||
|
return frozenset()
|
||||||
return contents.events
|
return contents.events
|
||||||
|
|
||||||
def _get_date_contents(self, date: datetime.date) -> FileContents | None:
|
def _get_date_contents(self, date: datetime.date) -> FileContents | None:
|
||||||
|
@ -137,7 +146,7 @@ class ObsidianVault:
|
||||||
|
|
||||||
ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
|
ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
|
||||||
(pre_events, list_block_items, post_events) = find_events_list_block(ast)
|
(pre_events, list_block_items, post_events) = find_events_list_block(ast)
|
||||||
events = [parse_event_string(list_item) for list_item in list_block_items]
|
events = frozenset(parse_event_string(list_item) for list_item in list_block_items)
|
||||||
return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
|
return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
|
||||||
|
|
||||||
def _save_contents(self, date: datetime.date, contents: FileContents) -> None:
|
def _save_contents(self, date: datetime.date, contents: FileContents) -> None:
|
||||||
|
@ -148,8 +157,10 @@ class ObsidianVault:
|
||||||
blocks_post_events = ''.join(
|
blocks_post_events = ''.join(
|
||||||
MARKDOWN_RENDERER.render(b) for b in contents.blocks_post_events
|
MARKDOWN_RENDERER.render(b) for b in contents.blocks_post_events
|
||||||
)
|
)
|
||||||
|
|
||||||
|
events = sorted(contents.events, key=lambda x: x.start_time or x.end_time)
|
||||||
block_events = '\n'.join(
|
block_events = '\n'.join(
|
||||||
'- ' + format_event_string(e) for e in unique(contents.events)
|
'- ' + format_event_string(e) for e in events
|
||||||
)
|
)
|
||||||
text = FILE_FORMAT.format(
|
text = FILE_FORMAT.format(
|
||||||
blocks_pre_events=blocks_pre_events,
|
blocks_pre_events=blocks_pre_events,
|
||||||
|
@ -254,7 +265,3 @@ def parse_event_string(event_str: str) -> Event:
|
||||||
return Event(start, end, m.group(3), m.group(4), m.group(5))
|
return Event(start, end, m.group(3), m.group(4), m.group(5))
|
||||||
logger.info('Could not parse format: %s', event_str)
|
logger.info('Could not parse format: %s', event_str)
|
||||||
return Event(None, None, None, None, event_str)
|
return Event(None, None, None, None, event_str)
|
||||||
|
|
||||||
|
|
||||||
def unique(ls: list) -> list:
|
|
||||||
return list(dict.fromkeys(ls))
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user