1
0

Compare commits

..

No commits in common. "6d1f23b1a08ca15f9814893abb743d3c93fcc9a1" and "8d9e6aefa4852073eb28b4576955fbeb2c539d2d" have entirely different histories.

7 changed files with 72 additions and 81 deletions

View File

@ -4,9 +4,8 @@ Sub-module for importing time-based data into Obsidian.
""" """
import dataclasses import dataclasses
from zoneinfo import ZoneInfo
import datetime import datetime
from collections.abc import Iterator, Iterable from collections.abc import Iterator
from logging import getLogger from logging import getLogger
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -120,6 +119,7 @@ class EventContent:
subject: str subject: str
comment: str comment: str
def import_activity_sample_csv( def import_activity_sample_csv(
vault: ObsidianVault, vault: ObsidianVault,
rows: Rows, rows: Rows,
@ -131,11 +131,9 @@ def import_activity_sample_csv(
if group_category is not None: if group_category is not None:
samples = merge_adjacent_samples(list(samples), group_category) samples = merge_adjacent_samples(list(samples), group_category)
timezone = ZoneInfo('Europe/Copenhagen') # TODO: Parameterize in an intelligent manner
samples_per_date: dict[datetime.date, list[RealizedActivitySample]] = {} samples_per_date: dict[datetime.date, list[RealizedActivitySample]] = {}
for sample in samples: for sample in samples:
date: datetime.date = sample.start_at.astimezone(timezone).date() date: datetime.date = sample.start_at.date()
samples_per_date.setdefault(date, []) samples_per_date.setdefault(date, [])
samples_per_date[date].append(sample) samples_per_date[date].append(sample)
del date, sample del date, sample
@ -143,9 +141,16 @@ def import_activity_sample_csv(
def map_to_event(sample: RealizedActivitySample) -> Event: def map_to_event(sample: RealizedActivitySample) -> Event:
content = content_mapper(sample) content = content_mapper(sample)
expected_tz = datetime.timezone(
datetime.timedelta(hours=2),
) # TODO: Determine this in a more intelligent manner
return Event( return Event(
sample.start_at, sample.start_at.astimezone(expected_tz)
sample.end_at, .replace(second=0, microsecond=0)
.time(),
sample.end_at.astimezone(expected_tz)
.replace(second=0, microsecond=0)
.time(),
verb=content.verb, verb=content.verb,
subject=escape_for_obsidian_link(content.subject), subject=escape_for_obsidian_link(content.subject),
comment=content.comment, comment=content.comment,
@ -153,7 +158,7 @@ def import_activity_sample_csv(
num_updated = 0 num_updated = 0
for date, samples in list(samples_per_date.items()): for date, samples in samples_per_date.items():
events = [map_to_event(sample) for sample in samples] events = [map_to_event(sample) for sample in samples]
was_updated = vault.add_events(date, events) was_updated = vault.add_events(date, events)
@ -211,9 +216,6 @@ def import_watched_series_csv_from_file(vault: ObsidianVault) -> int:
def import_played_games_csv_from_file(vault: ObsidianVault) -> int: def import_played_games_csv_from_file(vault: ObsidianVault) -> int:
data_path = Path('output/games_played.csv') data_path = Path('output/games_played.csv')
if not data_path.exists():
logger.warning('Skipping import of played games: %s is missing', data_path)
return 0
return import_activity_sample_csv_from_file( return import_activity_sample_csv_from_file(
vault, vault,
data_path, data_path,
@ -241,8 +243,8 @@ def import_data(obsidian_path: Path, dry_run=True):
num_updated = import_step_counts_csv(vault, rows) num_updated = import_step_counts_csv(vault, rows)
logger.info('Updated %d files', num_updated) logger.info('Updated %d files', num_updated)
import_played_games_csv_from_file(vault)
import_watched_series_csv_from_file(vault) import_watched_series_csv_from_file(vault)
import_played_games_csv_from_file(vault)
num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty]) num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty])
logger.info('dirty files in cache: %d', num_dirty) logger.info('dirty files in cache: %d', num_dirty)

View File

@ -6,8 +6,6 @@ from decimal import Decimal
from logging import getLogger from logging import getLogger
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from zoneinfo import ZoneInfo
import enforce_typing
import frontmatter import frontmatter
import marko import marko
@ -18,11 +16,10 @@ logger = getLogger(__name__)
StatisticKey = str StatisticKey = str
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, order=True) @dataclasses.dataclass(frozen=True, order=True)
class Event: class Event:
start_time: datetime.datetime | None start_time: datetime.time | None
end_time: datetime.datetime | None end_time: datetime.time | None
verb: str | None verb: str | None
subject: str | None subject: str | None
comment: str comment: str
@ -32,13 +29,13 @@ class Event:
assert ':' not in self.subject assert ':' not in self.subject
assert '/' not in self.subject assert '/' not in self.subject
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class FileContents: class FileContents:
frontmatter: dict[str, Any] frontmatter: dict[str, Any]
blocks_pre_events: list blocks_pre_events: list
events: frozenset[Event] events: frozenset[Event]
blocks_post_events: list blocks_post_events: list
timezone: ZoneInfo
@dataclasses.dataclass(frozen=False) @dataclasses.dataclass(frozen=False)
@ -57,6 +54,9 @@ FILE_FORMAT = """
{blocks_post_events} {blocks_post_events}
""" """
MIDNIGHT = datetime.time(0, 0, 0)
class ObsidianVault: class ObsidianVault:
def __init__( def __init__(
self, self,
@ -136,8 +136,6 @@ class ObsidianVault:
return contents.events return contents.events
def _load_date_contents(self, date: datetime.date) -> FileContents | None: def _load_date_contents(self, date: datetime.date) -> FileContents | None:
timezone = ZoneInfo('Europe/Copenhagen') # TODO: Parameterize in an intelligent manner
file_path = self._date_file_path(date) file_path = self._date_file_path(date)
text = self._load_file_text(file_path) or self._load_file_text( text = self._load_file_text(file_path) or self._load_file_text(
self._daily_template_path(), self._daily_template_path(),
@ -149,9 +147,9 @@ class ObsidianVault:
ast = MARKDOWN_PARSER.parse(str(file_frontmatter)) ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
(pre_events, list_block_items, post_events) = find_events_list_block(ast) (pre_events, list_block_items, post_events) = find_events_list_block(ast)
events = frozenset( events = frozenset(
parse_event_string(list_item, date, timezone) for list_item in list_block_items parse_event_string(list_item) for list_item in list_block_items
) )
return FileContents(file_frontmatter.metadata, pre_events, events, post_events, timezone) return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
def _save_date_contents(self, date: datetime.date, contents: FileContents) -> None: def _save_date_contents(self, date: datetime.date, contents: FileContents) -> None:
blocks_pre_events = ''.join( blocks_pre_events = ''.join(
@ -162,15 +160,10 @@ class ObsidianVault:
) )
events = list(contents.events) events = list(contents.events)
events.sort(key=lambda x: x.comment or '')
events.sort(key=lambda x: x.subject or '') events.sort(key=lambda x: x.subject or '')
events.sort(key=lambda x: x.verb or '') events.sort(key=lambda x: x.verb or '')
date_sentinel = datetime.datetime(1900, 1, 1, 1, 1, 1, tzinfo=contents.timezone) events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT)
events.sort(key=lambda x: x.start_time or x.end_time or date_sentinel) block_events = '\n'.join('- ' + format_event_string(e) for e in events)
formatted_events = ['- ' + format_event_string(e, tz = contents.timezone) for e in events]
formatted_events = list(dict.fromkeys(formatted_events))
block_events = '\n'.join(formatted_events)
post = frontmatter.Post( post = frontmatter.Post(
content=FILE_FORMAT.format( content=FILE_FORMAT.format(
@ -232,7 +225,7 @@ def find_events_list_block(ast) -> tuple[list, list[str], list]:
isinstance(block, marko.block.Heading) isinstance(block, marko.block.Heading)
and block.children[0].children.lower() == 'events' and block.children[0].children.lower() == 'events'
): ):
events_block = ast.children[block_i + 1] if block_i + 1 < len(ast.children) else None events_block = ast.children[block_i + 1]
if isinstance(events_block, marko.block.List): if isinstance(events_block, marko.block.List):
offset = 2 offset = 2
event_texts = [ event_texts = [
@ -247,7 +240,7 @@ def find_events_list_block(ast) -> tuple[list, list[str], list]:
return (blocks, [], []) return (blocks, [], [])
def format_event_string(event: Event, tz: ZoneInfo) -> str: def format_event_string(event: Event) -> str:
assert event is not None assert event is not None
if ( if (
event.start_time is None event.start_time is None
@ -258,9 +251,9 @@ def format_event_string(event: Event, tz: ZoneInfo) -> str:
return event.comment return event.comment
buf = [] buf = []
buf.append(f'{event.start_time.astimezone(tz):%H:%M}') buf.append(f'{event.start_time:%H:%M}')
if event.end_time and event.end_time != event.start_time: if event.end_time and event.end_time != event.start_time:
buf.append(f'-{event.end_time.astimezone(tz):%H:%M}') buf.append(f'-{event.end_time:%H:%M}')
buf.append(' | ') buf.append(' | ')
buf.append(event.verb) buf.append(event.verb)
buf.append(' [[') buf.append(' [[')
@ -273,14 +266,12 @@ def format_event_string(event: Event, tz: ZoneInfo) -> str:
RE_TIME = r'(\d\d:\d\d(?::\d\d(?:\.\d+?))?)' RE_TIME = r'(\d\d:\d\d(?::\d\d(?:\.\d+?))?)'
RE_VERB = r'(\w+(?:ed|te))' RE_VERB = r'(\w+(?:ed|te))'
RE_LINK_MD = r'\[([^\]:/]*)\]\(?:[^)]*\)' RE_LINK_MD = r'\[([^\]:/]*)\]\(?:[^)]*\)'
RE_LINK_WIKI = r'\[\[(?:[^\]:]*\/)?([^\]:/]*)\]\]' RE_LINK_WIKI = r'\[\[([^\]:/]*)\]\]'
RE_TIME_FORMAT = RE_TIME + r'(?:\s*\-\s*' + RE_TIME + r')?' RE_TIME_FORMAT = RE_TIME + r'(?:\s*\-\s*' + RE_TIME + r')?'
def parse_event_string(event_str: str, date: datetime.date, timezone: ZoneInfo) -> Event: def parse_event_string(event_str: str) -> Event:
"""Parses event string for the given date.
"""
if m := re.match( if m := re.match(
r'^\s*' r'^\s*'
+ RE_TIME_FORMAT + RE_TIME_FORMAT
@ -291,9 +282,10 @@ def parse_event_string(event_str: str, date: datetime.date, timezone: ZoneInfo)
+ r'\.?\s*(.*)$', + r'\.?\s*(.*)$',
event_str, event_str,
): ):
start_time = datetime.time.fromisoformat(m.group(1)) start = datetime.time.fromisoformat(m.group(1))
end_time = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start_time end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start
elif m := re.match( return Event(start, end, m.group(3), m.group(4), m.group(5))
if m := re.match(
r'^\s*' r'^\s*'
+ RE_TIME_FORMAT + RE_TIME_FORMAT
+ r'[ :\|-]*' + r'[ :\|-]*'
@ -303,13 +295,8 @@ def parse_event_string(event_str: str, date: datetime.date, timezone: ZoneInfo)
+ r'\.?\s*(.*)$', + r'\.?\s*(.*)$',
event_str, event_str,
): ):
start_time = datetime.time.fromisoformat(m.group(1)) start = datetime.time.fromisoformat(m.group(1))
end_time = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start_time end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start
else: return Event(start, end, m.group(3), m.group(4), m.group(5))
logger.info('Could not parse format: %s', event_str) logger.info('Could not parse format: %s', event_str)
return Event(None, None, None, None, event_str) return Event(None, None, None, None, event_str)
start = datetime.datetime.combine(date, start_time, timezone).astimezone(datetime.UTC)
end = datetime.datetime.combine(date, end_time, timezone).astimezone(datetime.UTC)
return Event(start, end, m.group(3), m.group(4), m.group(5))

View File

@ -21,8 +21,6 @@ URL_USER_GAME_TROPHIES = URL_API_ROOT + 'trophies/{game_id}/{psn_id}'
URL_GAMES_OVERVIEW = URL_API_ROOT + '{psn_id}' URL_GAMES_OVERVIEW = URL_API_ROOT + '{psn_id}'
PSN_PROFILES_DEFAULT_TIMEZONE=datetime.UTC
def game_psnprofiles_id_from_url(relative_url: str) -> int: def game_psnprofiles_id_from_url(relative_url: str) -> int:
m = re.match(r'/(?:trophy|trophies)/(\d+)\-(?:[\w-]+)(/[\w-]*)?', relative_url) m = re.match(r'/(?:trophy|trophies)/(\d+)\-(?:[\w-]+)(/[\w-]*)?', relative_url)
result = m.group(1) result = m.group(1)
@ -197,7 +195,7 @@ class PsnProfiles(Scraper):
if 'Missing\nTimestamp' in cells[2].get_text().strip(): if 'Missing\nTimestamp' in cells[2].get_text().strip():
continue continue
cells[2].span.span.nobr.sup.extract() cells[2].span.span.nobr.sup.extract()
gotten_at = parse_util.parse_time(cells[2].get_text(), timezone=PSN_PROFILES_DEFAULT_TIMEZONE) gotten_at = parse_util.parse_time(cells[2].get_text())
yield { yield {
'game.name': game_name, 'game.name': game_name,

View File

@ -41,12 +41,14 @@ LOCAL_TIMEZONE = NOW.astimezone().tzinfo
def try_parse(text: str, fmt: str) -> datetime.datetime | None: def try_parse(text: str, fmt: str) -> datetime.datetime | None:
try: try:
time = datetime.datetime.strptime(text, fmt) # noqa: DTZ007 time = datetime.datetime.strptime(text, fmt) # noqa: DTZ007
if time.tzinfo is None:
time = time.replace(tzinfo=LOCAL_TIMEZONE)
except ValueError: except ValueError:
time = None time = None
return time return time
def parse_time(text: str, timezone = LOCAL_TIMEZONE) -> datetime.datetime: def parse_time(text: str) -> datetime.datetime:
text = text.replace('\n', ' ') text = text.replace('\n', ' ')
text = text.strip() text = text.strip()
@ -60,12 +62,12 @@ def parse_time(text: str, timezone = LOCAL_TIMEZONE) -> datetime.datetime:
raise RuntimeError(msg) raise RuntimeError(msg)
if time.tzinfo is None: if time.tzinfo is None:
time = time.replace(tzinfo=timezone) time = time.replace(tzinfo=LOCAL_TIMEZONE)
if time.tzinfo is None: if time.tzinfo is None:
msg = 'Could not parse timezone: ' + text msg = 'Could not parse timezone: ' + text
raise RuntimeError(msg) raise RuntimeError(msg)
return time.astimezone(datetime.UTC) return time
def parse_date(text: str) -> datetime.date: def parse_date(text: str) -> datetime.date:

View File

@ -19,9 +19,8 @@ def csv_safe_value(v: Any) -> str:
if isinstance(v, urllib.parse.ParseResult): if isinstance(v, urllib.parse.ParseResult):
return v.geturl() return v.geturl()
if isinstance(v, datetime.datetime): if isinstance(v, datetime.datetime):
if v.tzinfo is None or v.tzinfo != datetime.UTC: if v.tzinfo is None:
msg = f'Timezone must be UTC: {v}' raise RuntimeError(v)
raise ValueError(msg)
return str(v) return str(v)

View File

@ -4,19 +4,27 @@ import pytest
from obsidian_import import obsidian from obsidian_import import obsidian
from .test_obsidian_vault import EXAMPLES, EXAMPLE_DATE, EXAMPLE_TIMEZONE EXAMPLES = [
obsidian.Event(
datetime.time(12, 0, 0),
datetime.time(12, 0, 0),
'Ate',
'Lunch',
'instantly',
),
obsidian.Event(
datetime.time(20, 0, 0),
datetime.time(22, 0, 0),
'Watched',
'Tom and Jerry',
'on the *Television*',
),
obsidian.Event(None, None, None, None, 'Took a walk'),
obsidian.Event(None, None, None, None, 'Watched [[Cyberpunk: Edgerunners]].'),
]
def test_parse_event_string():
formatted = '17:44 | Watched [[../../media/anime/Azumanga Daioh]]. Season 1 Episode 6: *Sports Fest*'
event = obsidian.parse_event_string(formatted, EXAMPLE_DATE, EXAMPLE_TIMEZONE)
assert event is not None
assert event.subject == 'Azumanga Daioh'
assert event.start_time is not None
@pytest.mark.parametrize('event', EXAMPLES) @pytest.mark.parametrize('event', EXAMPLES)
def test_format_preserves_information(event: obsidian.Event): def test_format_preserves_information(event: obsidian.Event):
formatted = obsidian.format_event_string(event, EXAMPLE_TIMEZONE) formatted = obsidian.format_event_string(event)
assert obsidian.parse_event_string(formatted, EXAMPLE_DATE, assert obsidian.parse_event_string(formatted) == event
EXAMPLE_TIMEZONE) == event

View File

@ -1,24 +1,19 @@
import datetime import datetime
from pathlib import Path from pathlib import Path
from zoneinfo import ZoneInfo
from obsidian_import import obsidian from obsidian_import import obsidian
EXAMPLE_TIMEZONE = ZoneInfo('Europe/Copenhagen')
EXAMPLE_DATE = datetime.date(2020, 1, 1)
EXAMPLES = [ EXAMPLES = [
obsidian.Event( obsidian.Event(
datetime.datetime(2020, 1, 1, 12, 0, 0, tzinfo=EXAMPLE_TIMEZONE), datetime.time(12, 0, 0),
datetime.datetime(2020, 1, 1, 12, 0, 0, tzinfo=EXAMPLE_TIMEZONE), datetime.time(12, 0, 0),
'Ate', 'Ate',
'Lunch', 'Lunch',
'instantly', 'instantly',
), ),
obsidian.Event( obsidian.Event(
datetime.datetime(2020, 1, 1, 20, 0, 0, tzinfo=EXAMPLE_TIMEZONE), datetime.time(20, 0, 0),
datetime.datetime(2020, 1, 1, 22, 0, 0, tzinfo=EXAMPLE_TIMEZONE), datetime.time(22, 0, 0),
'Watched', 'Watched',
'Tom and Jerry', 'Tom and Jerry',
'on the *Television*', 'on the *Television*',
@ -38,7 +33,7 @@ def test_write_internally():
vault.path_format = 'YYYY-MM-DD' vault.path_format = 'YYYY-MM-DD'
vault.template_file_path = Path('daily-template-file.md') vault.template_file_path = Path('daily-template-file.md')
vault.add_events(EXAMPLE_DATE, EXAMPLES) vault.add_events(datetime.date(2020, 1, 1), EXAMPLES)
assert len(vault.internal_file_text_cache) == 2 assert len(vault.internal_file_text_cache) == 2
assert vault.internal_file_text_cache[ assert vault.internal_file_text_cache[