1
0

Compare commits

..

No commits in common. "6d1f23b1a08ca15f9814893abb743d3c93fcc9a1" and "8d9e6aefa4852073eb28b4576955fbeb2c539d2d" have entirely different histories.

7 changed files with 72 additions and 81 deletions

View File

@ -4,9 +4,8 @@ Sub-module for importing time-based data into Obsidian.
"""
import dataclasses
from zoneinfo import ZoneInfo
import datetime
from collections.abc import Iterator, Iterable
from collections.abc import Iterator
from logging import getLogger
from pathlib import Path
from typing import Any
@ -120,6 +119,7 @@ class EventContent:
subject: str
comment: str
def import_activity_sample_csv(
vault: ObsidianVault,
rows: Rows,
@ -131,11 +131,9 @@ def import_activity_sample_csv(
if group_category is not None:
samples = merge_adjacent_samples(list(samples), group_category)
timezone = ZoneInfo('Europe/Copenhagen') # TODO: Parameterize in an intelligent manner
samples_per_date: dict[datetime.date, list[RealizedActivitySample]] = {}
for sample in samples:
date: datetime.date = sample.start_at.astimezone(timezone).date()
date: datetime.date = sample.start_at.date()
samples_per_date.setdefault(date, [])
samples_per_date[date].append(sample)
del date, sample
@ -143,9 +141,16 @@ def import_activity_sample_csv(
def map_to_event(sample: RealizedActivitySample) -> Event:
content = content_mapper(sample)
expected_tz = datetime.timezone(
datetime.timedelta(hours=2),
) # TODO: Determine this in a more intelligent manner
return Event(
sample.start_at,
sample.end_at,
sample.start_at.astimezone(expected_tz)
.replace(second=0, microsecond=0)
.time(),
sample.end_at.astimezone(expected_tz)
.replace(second=0, microsecond=0)
.time(),
verb=content.verb,
subject=escape_for_obsidian_link(content.subject),
comment=content.comment,
@ -153,7 +158,7 @@ def import_activity_sample_csv(
num_updated = 0
for date, samples in list(samples_per_date.items()):
for date, samples in samples_per_date.items():
events = [map_to_event(sample) for sample in samples]
was_updated = vault.add_events(date, events)
@ -211,9 +216,6 @@ def import_watched_series_csv_from_file(vault: ObsidianVault) -> int:
def import_played_games_csv_from_file(vault: ObsidianVault) -> int:
data_path = Path('output/games_played.csv')
if not data_path.exists():
logger.warning('Skipping import of played games: %s is missing', data_path)
return 0
return import_activity_sample_csv_from_file(
vault,
data_path,
@ -241,8 +243,8 @@ def import_data(obsidian_path: Path, dry_run=True):
num_updated = import_step_counts_csv(vault, rows)
logger.info('Updated %d files', num_updated)
import_played_games_csv_from_file(vault)
import_watched_series_csv_from_file(vault)
import_played_games_csv_from_file(vault)
num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty])
logger.info('dirty files in cache: %d', num_dirty)

View File

@ -6,8 +6,6 @@ from decimal import Decimal
from logging import getLogger
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
import enforce_typing
import frontmatter
import marko
@ -18,11 +16,10 @@ logger = getLogger(__name__)
StatisticKey = str
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, order=True)
class Event:
start_time: datetime.datetime | None
end_time: datetime.datetime | None
start_time: datetime.time | None
end_time: datetime.time | None
verb: str | None
subject: str | None
comment: str
@ -32,13 +29,13 @@ class Event:
assert ':' not in self.subject
assert '/' not in self.subject
@dataclasses.dataclass(frozen=True)
class FileContents:
frontmatter: dict[str, Any]
blocks_pre_events: list
events: frozenset[Event]
blocks_post_events: list
timezone: ZoneInfo
@dataclasses.dataclass(frozen=False)
@ -57,6 +54,9 @@ FILE_FORMAT = """
{blocks_post_events}
"""
MIDNIGHT = datetime.time(0, 0, 0)
class ObsidianVault:
def __init__(
self,
@ -136,8 +136,6 @@ class ObsidianVault:
return contents.events
def _load_date_contents(self, date: datetime.date) -> FileContents | None:
timezone = ZoneInfo('Europe/Copenhagen') # TODO: Parameterize in an intelligent manner
file_path = self._date_file_path(date)
text = self._load_file_text(file_path) or self._load_file_text(
self._daily_template_path(),
@ -149,9 +147,9 @@ class ObsidianVault:
ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
(pre_events, list_block_items, post_events) = find_events_list_block(ast)
events = frozenset(
parse_event_string(list_item, date, timezone) for list_item in list_block_items
parse_event_string(list_item) for list_item in list_block_items
)
return FileContents(file_frontmatter.metadata, pre_events, events, post_events, timezone)
return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
def _save_date_contents(self, date: datetime.date, contents: FileContents) -> None:
blocks_pre_events = ''.join(
@ -162,15 +160,10 @@ class ObsidianVault:
)
events = list(contents.events)
events.sort(key=lambda x: x.comment or '')
events.sort(key=lambda x: x.subject or '')
events.sort(key=lambda x: x.verb or '')
date_sentinel = datetime.datetime(1900, 1, 1, 1, 1, 1, tzinfo=contents.timezone)
events.sort(key=lambda x: x.start_time or x.end_time or date_sentinel)
formatted_events = ['- ' + format_event_string(e, tz = contents.timezone) for e in events]
formatted_events = list(dict.fromkeys(formatted_events))
block_events = '\n'.join(formatted_events)
events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT)
block_events = '\n'.join('- ' + format_event_string(e) for e in events)
post = frontmatter.Post(
content=FILE_FORMAT.format(
@ -232,7 +225,7 @@ def find_events_list_block(ast) -> tuple[list, list[str], list]:
isinstance(block, marko.block.Heading)
and block.children[0].children.lower() == 'events'
):
events_block = ast.children[block_i + 1] if block_i + 1 < len(ast.children) else None
events_block = ast.children[block_i + 1]
if isinstance(events_block, marko.block.List):
offset = 2
event_texts = [
@ -247,7 +240,7 @@ def find_events_list_block(ast) -> tuple[list, list[str], list]:
return (blocks, [], [])
def format_event_string(event: Event, tz: ZoneInfo) -> str:
def format_event_string(event: Event) -> str:
assert event is not None
if (
event.start_time is None
@ -258,9 +251,9 @@ def format_event_string(event: Event, tz: ZoneInfo) -> str:
return event.comment
buf = []
buf.append(f'{event.start_time.astimezone(tz):%H:%M}')
buf.append(f'{event.start_time:%H:%M}')
if event.end_time and event.end_time != event.start_time:
buf.append(f'-{event.end_time.astimezone(tz):%H:%M}')
buf.append(f'-{event.end_time:%H:%M}')
buf.append(' | ')
buf.append(event.verb)
buf.append(' [[')
@ -273,14 +266,12 @@ def format_event_string(event: Event, tz: ZoneInfo) -> str:
RE_TIME = r'(\d\d:\d\d(?::\d\d(?:\.\d+?))?)'
RE_VERB = r'(\w+(?:ed|te))'
RE_LINK_MD = r'\[([^\]:/]*)\]\(?:[^)]*\)'
RE_LINK_WIKI = r'\[\[(?:[^\]:]*\/)?([^\]:/]*)\]\]'
RE_LINK_WIKI = r'\[\[([^\]:/]*)\]\]'
RE_TIME_FORMAT = RE_TIME + r'(?:\s*\-\s*' + RE_TIME + r')?'
def parse_event_string(event_str: str, date: datetime.date, timezone: ZoneInfo) -> Event:
"""Parses event string for the given date.
"""
def parse_event_string(event_str: str) -> Event:
if m := re.match(
r'^\s*'
+ RE_TIME_FORMAT
@ -291,9 +282,10 @@ def parse_event_string(event_str: str, date: datetime.date, timezone: ZoneInfo)
+ r'\.?\s*(.*)$',
event_str,
):
start_time = datetime.time.fromisoformat(m.group(1))
end_time = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start_time
elif m := re.match(
start = datetime.time.fromisoformat(m.group(1))
end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start
return Event(start, end, m.group(3), m.group(4), m.group(5))
if m := re.match(
r'^\s*'
+ RE_TIME_FORMAT
+ r'[ :\|-]*'
@ -303,13 +295,8 @@ def parse_event_string(event_str: str, date: datetime.date, timezone: ZoneInfo)
+ r'\.?\s*(.*)$',
event_str,
):
start_time = datetime.time.fromisoformat(m.group(1))
end_time = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start_time
else:
logger.info('Could not parse format: %s', event_str)
return Event(None, None, None, None, event_str)
start = datetime.datetime.combine(date, start_time, timezone).astimezone(datetime.UTC)
end = datetime.datetime.combine(date, end_time, timezone).astimezone(datetime.UTC)
return Event(start, end, m.group(3), m.group(4), m.group(5))
start = datetime.time.fromisoformat(m.group(1))
end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start
return Event(start, end, m.group(3), m.group(4), m.group(5))
logger.info('Could not parse format: %s', event_str)
return Event(None, None, None, None, event_str)

View File

@ -21,8 +21,6 @@ URL_USER_GAME_TROPHIES = URL_API_ROOT + 'trophies/{game_id}/{psn_id}'
URL_GAMES_OVERVIEW = URL_API_ROOT + '{psn_id}'
PSN_PROFILES_DEFAULT_TIMEZONE=datetime.UTC
def game_psnprofiles_id_from_url(relative_url: str) -> int:
m = re.match(r'/(?:trophy|trophies)/(\d+)\-(?:[\w-]+)(/[\w-]*)?', relative_url)
result = m.group(1)
@ -197,7 +195,7 @@ class PsnProfiles(Scraper):
if 'Missing\nTimestamp' in cells[2].get_text().strip():
continue
cells[2].span.span.nobr.sup.extract()
gotten_at = parse_util.parse_time(cells[2].get_text(), timezone=PSN_PROFILES_DEFAULT_TIMEZONE)
gotten_at = parse_util.parse_time(cells[2].get_text())
yield {
'game.name': game_name,

View File

@ -41,12 +41,14 @@ LOCAL_TIMEZONE = NOW.astimezone().tzinfo
def try_parse(text: str, fmt: str) -> datetime.datetime | None:
try:
time = datetime.datetime.strptime(text, fmt) # noqa: DTZ007
if time.tzinfo is None:
time = time.replace(tzinfo=LOCAL_TIMEZONE)
except ValueError:
time = None
return time
def parse_time(text: str, timezone = LOCAL_TIMEZONE) -> datetime.datetime:
def parse_time(text: str) -> datetime.datetime:
text = text.replace('\n', ' ')
text = text.strip()
@ -60,12 +62,12 @@ def parse_time(text: str, timezone = LOCAL_TIMEZONE) -> datetime.datetime:
raise RuntimeError(msg)
if time.tzinfo is None:
time = time.replace(tzinfo=timezone)
time = time.replace(tzinfo=LOCAL_TIMEZONE)
if time.tzinfo is None:
msg = 'Could not parse timezone: ' + text
raise RuntimeError(msg)
return time.astimezone(datetime.UTC)
return time
def parse_date(text: str) -> datetime.date:

View File

@ -19,9 +19,8 @@ def csv_safe_value(v: Any) -> str:
if isinstance(v, urllib.parse.ParseResult):
return v.geturl()
if isinstance(v, datetime.datetime):
if v.tzinfo is None or v.tzinfo != datetime.UTC:
msg = f'Timezone must be UTC: {v}'
raise ValueError(msg)
if v.tzinfo is None:
raise RuntimeError(v)
return str(v)

View File

@ -4,19 +4,27 @@ import pytest
from obsidian_import import obsidian
from .test_obsidian_vault import EXAMPLES, EXAMPLE_DATE, EXAMPLE_TIMEZONE
EXAMPLES = [
obsidian.Event(
datetime.time(12, 0, 0),
datetime.time(12, 0, 0),
'Ate',
'Lunch',
'instantly',
),
obsidian.Event(
datetime.time(20, 0, 0),
datetime.time(22, 0, 0),
'Watched',
'Tom and Jerry',
'on the *Television*',
),
obsidian.Event(None, None, None, None, 'Took a walk'),
obsidian.Event(None, None, None, None, 'Watched [[Cyberpunk: Edgerunners]].'),
]
def test_parse_event_string():
formatted = '17:44 | Watched [[../../media/anime/Azumanga Daioh]]. Season 1 Episode 6: *Sports Fest*'
event = obsidian.parse_event_string(formatted, EXAMPLE_DATE, EXAMPLE_TIMEZONE)
assert event is not None
assert event.subject == 'Azumanga Daioh'
assert event.start_time is not None
@pytest.mark.parametrize('event', EXAMPLES)
def test_format_preserves_information(event: obsidian.Event):
formatted = obsidian.format_event_string(event, EXAMPLE_TIMEZONE)
assert obsidian.parse_event_string(formatted, EXAMPLE_DATE,
EXAMPLE_TIMEZONE) == event
formatted = obsidian.format_event_string(event)
assert obsidian.parse_event_string(formatted) == event

View File

@ -1,24 +1,19 @@
import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
from obsidian_import import obsidian
EXAMPLE_TIMEZONE = ZoneInfo('Europe/Copenhagen')
EXAMPLE_DATE = datetime.date(2020, 1, 1)
EXAMPLES = [
obsidian.Event(
datetime.datetime(2020, 1, 1, 12, 0, 0, tzinfo=EXAMPLE_TIMEZONE),
datetime.datetime(2020, 1, 1, 12, 0, 0, tzinfo=EXAMPLE_TIMEZONE),
datetime.time(12, 0, 0),
datetime.time(12, 0, 0),
'Ate',
'Lunch',
'instantly',
),
obsidian.Event(
datetime.datetime(2020, 1, 1, 20, 0, 0, tzinfo=EXAMPLE_TIMEZONE),
datetime.datetime(2020, 1, 1, 22, 0, 0, tzinfo=EXAMPLE_TIMEZONE),
datetime.time(20, 0, 0),
datetime.time(22, 0, 0),
'Watched',
'Tom and Jerry',
'on the *Television*',
@ -38,7 +33,7 @@ def test_write_internally():
vault.path_format = 'YYYY-MM-DD'
vault.template_file_path = Path('daily-template-file.md')
vault.add_events(EXAMPLE_DATE, EXAMPLES)
vault.add_events(datetime.date(2020, 1, 1), EXAMPLES)
assert len(vault.internal_file_text_cache) == 2
assert vault.internal_file_text_cache[