1
0

Compare commits

..

8 Commits

Author SHA1 Message Date
6d1f23b1a0
Fixed PSN Profiles timezones
Some checks failed
Run Python tests (through Pytest) / Test (push) Failing after 34s
Verify Python project can be installed, loaded and have version checked / Test (push) Successful in 29s
2024-11-24 19:07:11 +01:00
91e5562298
Parse datetimes to UTC. 2024-11-24 18:44:04 +01:00
ec27c7e4e0
More strict fetching validation 2024-11-24 18:33:17 +01:00
188249e39f
More robust 2024-11-24 17:58:36 +01:00
47761eb4d7
Improved path parsing 2024-11-24 17:49:00 +01:00
086611909e
Convert back to UTC ASAP 2024-11-24 17:13:05 +01:00
78a3b3b767
Use full datetimes events 2024-11-24 17:08:41 +01:00
05c870402d
Generalized testing 2024-11-24 17:08:19 +01:00
7 changed files with 81 additions and 72 deletions

View File

@ -4,8 +4,9 @@ Sub-module for importing time-based data into Obsidian.
"""
import dataclasses
from zoneinfo import ZoneInfo
import datetime
from collections.abc import Iterator
from collections.abc import Iterator, Iterable
from logging import getLogger
from pathlib import Path
from typing import Any
@ -119,7 +120,6 @@ class EventContent:
subject: str
comment: str
def import_activity_sample_csv(
vault: ObsidianVault,
rows: Rows,
@ -131,9 +131,11 @@ def import_activity_sample_csv(
if group_category is not None:
samples = merge_adjacent_samples(list(samples), group_category)
timezone = ZoneInfo('Europe/Copenhagen') # TODO: Parameterize in an intelligent manner
samples_per_date: dict[datetime.date, list[RealizedActivitySample]] = {}
for sample in samples:
date: datetime.date = sample.start_at.date()
date: datetime.date = sample.start_at.astimezone(timezone).date()
samples_per_date.setdefault(date, [])
samples_per_date[date].append(sample)
del date, sample
@ -141,16 +143,9 @@ def import_activity_sample_csv(
def map_to_event(sample: RealizedActivitySample) -> Event:
content = content_mapper(sample)
expected_tz = datetime.timezone(
datetime.timedelta(hours=2),
) # TODO: Determine this in a more intelligent manner
return Event(
sample.start_at.astimezone(expected_tz)
.replace(second=0, microsecond=0)
.time(),
sample.end_at.astimezone(expected_tz)
.replace(second=0, microsecond=0)
.time(),
sample.start_at,
sample.end_at,
verb=content.verb,
subject=escape_for_obsidian_link(content.subject),
comment=content.comment,
@ -158,7 +153,7 @@ def import_activity_sample_csv(
num_updated = 0
for date, samples in samples_per_date.items():
for date, samples in list(samples_per_date.items()):
events = [map_to_event(sample) for sample in samples]
was_updated = vault.add_events(date, events)
@ -216,6 +211,9 @@ def import_watched_series_csv_from_file(vault: ObsidianVault) -> int:
def import_played_games_csv_from_file(vault: ObsidianVault) -> int:
data_path = Path('output/games_played.csv')
if not data_path.exists():
logger.warning('Skipping import of played games: %s is missing', data_path)
return 0
return import_activity_sample_csv_from_file(
vault,
data_path,
@ -243,8 +241,8 @@ def import_data(obsidian_path: Path, dry_run=True):
num_updated = import_step_counts_csv(vault, rows)
logger.info('Updated %d files', num_updated)
import_watched_series_csv_from_file(vault)
import_played_games_csv_from_file(vault)
import_watched_series_csv_from_file(vault)
num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty])
logger.info('dirty files in cache: %d', num_dirty)

View File

@ -6,6 +6,8 @@ from decimal import Decimal
from logging import getLogger
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
import enforce_typing
import frontmatter
import marko
@ -16,10 +18,11 @@ logger = getLogger(__name__)
StatisticKey = str
@enforce_typing.enforce_types
@dataclasses.dataclass(frozen=True, order=True)
class Event:
start_time: datetime.time | None
end_time: datetime.time | None
start_time: datetime.datetime | None
end_time: datetime.datetime | None
verb: str | None
subject: str | None
comment: str
@ -29,13 +32,13 @@ class Event:
assert ':' not in self.subject
assert '/' not in self.subject
@dataclasses.dataclass(frozen=True)
class FileContents:
frontmatter: dict[str, Any]
blocks_pre_events: list
events: frozenset[Event]
blocks_post_events: list
timezone: ZoneInfo
@dataclasses.dataclass(frozen=False)
@ -54,9 +57,6 @@ FILE_FORMAT = """
{blocks_post_events}
"""
MIDNIGHT = datetime.time(0, 0, 0)
class ObsidianVault:
def __init__(
self,
@ -136,6 +136,8 @@ class ObsidianVault:
return contents.events
def _load_date_contents(self, date: datetime.date) -> FileContents | None:
timezone = ZoneInfo('Europe/Copenhagen') # TODO: Parameterize in an intelligent manner
file_path = self._date_file_path(date)
text = self._load_file_text(file_path) or self._load_file_text(
self._daily_template_path(),
@ -147,9 +149,9 @@ class ObsidianVault:
ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
(pre_events, list_block_items, post_events) = find_events_list_block(ast)
events = frozenset(
parse_event_string(list_item) for list_item in list_block_items
parse_event_string(list_item, date, timezone) for list_item in list_block_items
)
return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
return FileContents(file_frontmatter.metadata, pre_events, events, post_events, timezone)
def _save_date_contents(self, date: datetime.date, contents: FileContents) -> None:
blocks_pre_events = ''.join(
@ -160,10 +162,15 @@ class ObsidianVault:
)
events = list(contents.events)
events.sort(key=lambda x: x.comment or '')
events.sort(key=lambda x: x.subject or '')
events.sort(key=lambda x: x.verb or '')
events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT)
block_events = '\n'.join('- ' + format_event_string(e) for e in events)
date_sentinel = datetime.datetime(1900, 1, 1, 1, 1, 1, tzinfo=contents.timezone)
events.sort(key=lambda x: x.start_time or x.end_time or date_sentinel)
formatted_events = ['- ' + format_event_string(e, tz = contents.timezone) for e in events]
formatted_events = list(dict.fromkeys(formatted_events))
block_events = '\n'.join(formatted_events)
post = frontmatter.Post(
content=FILE_FORMAT.format(
@ -225,7 +232,7 @@ def find_events_list_block(ast) -> tuple[list, list[str], list]:
isinstance(block, marko.block.Heading)
and block.children[0].children.lower() == 'events'
):
events_block = ast.children[block_i + 1]
events_block = ast.children[block_i + 1] if block_i + 1 < len(ast.children) else None
if isinstance(events_block, marko.block.List):
offset = 2
event_texts = [
@ -240,7 +247,7 @@ def find_events_list_block(ast) -> tuple[list, list[str], list]:
return (blocks, [], [])
def format_event_string(event: Event) -> str:
def format_event_string(event: Event, tz: ZoneInfo) -> str:
assert event is not None
if (
event.start_time is None
@ -251,9 +258,9 @@ def format_event_string(event: Event) -> str:
return event.comment
buf = []
buf.append(f'{event.start_time:%H:%M}')
buf.append(f'{event.start_time.astimezone(tz):%H:%M}')
if event.end_time and event.end_time != event.start_time:
buf.append(f'-{event.end_time:%H:%M}')
buf.append(f'-{event.end_time.astimezone(tz):%H:%M}')
buf.append(' | ')
buf.append(event.verb)
buf.append(' [[')
@ -266,12 +273,14 @@ def format_event_string(event: Event) -> str:
RE_TIME = r'(\d\d:\d\d(?::\d\d(?:\.\d+?))?)'
RE_VERB = r'(\w+(?:ed|te))'
RE_LINK_MD = r'\[([^\]:/]*)\]\(?:[^)]*\)'
RE_LINK_WIKI = r'\[\[([^\]:/]*)\]\]'
RE_LINK_WIKI = r'\[\[(?:[^\]:]*\/)?([^\]:/]*)\]\]'
RE_TIME_FORMAT = RE_TIME + r'(?:\s*\-\s*' + RE_TIME + r')?'
def parse_event_string(event_str: str) -> Event:
def parse_event_string(event_str: str, date: datetime.date, timezone: ZoneInfo) -> Event:
"""Parses event string for the given date.
"""
if m := re.match(
r'^\s*'
+ RE_TIME_FORMAT
@ -282,10 +291,9 @@ def parse_event_string(event_str: str) -> Event:
+ r'\.?\s*(.*)$',
event_str,
):
start = datetime.time.fromisoformat(m.group(1))
end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start
return Event(start, end, m.group(3), m.group(4), m.group(5))
if m := re.match(
start_time = datetime.time.fromisoformat(m.group(1))
end_time = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start_time
elif m := re.match(
r'^\s*'
+ RE_TIME_FORMAT
+ r'[ :\|-]*'
@ -295,8 +303,13 @@ def parse_event_string(event_str: str) -> Event:
+ r'\.?\s*(.*)$',
event_str,
):
start = datetime.time.fromisoformat(m.group(1))
end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start
return Event(start, end, m.group(3), m.group(4), m.group(5))
start_time = datetime.time.fromisoformat(m.group(1))
end_time = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start_time
else:
logger.info('Could not parse format: %s', event_str)
return Event(None, None, None, None, event_str)
start = datetime.datetime.combine(date, start_time, timezone).astimezone(datetime.UTC)
end = datetime.datetime.combine(date, end_time, timezone).astimezone(datetime.UTC)
return Event(start, end, m.group(3), m.group(4), m.group(5))

View File

@ -21,6 +21,8 @@ URL_USER_GAME_TROPHIES = URL_API_ROOT + 'trophies/{game_id}/{psn_id}'
URL_GAMES_OVERVIEW = URL_API_ROOT + '{psn_id}'
PSN_PROFILES_DEFAULT_TIMEZONE=datetime.UTC
def game_psnprofiles_id_from_url(relative_url: str) -> int:
m = re.match(r'/(?:trophy|trophies)/(\d+)\-(?:[\w-]+)(/[\w-]*)?', relative_url)
result = m.group(1)
@ -195,7 +197,7 @@ class PsnProfiles(Scraper):
if 'Missing\nTimestamp' in cells[2].get_text().strip():
continue
cells[2].span.span.nobr.sup.extract()
gotten_at = parse_util.parse_time(cells[2].get_text())
gotten_at = parse_util.parse_time(cells[2].get_text(), timezone=PSN_PROFILES_DEFAULT_TIMEZONE)
yield {
'game.name': game_name,

View File

@ -41,14 +41,12 @@ LOCAL_TIMEZONE = NOW.astimezone().tzinfo
def try_parse(text: str, fmt: str) -> datetime.datetime | None:
try:
time = datetime.datetime.strptime(text, fmt) # noqa: DTZ007
if time.tzinfo is None:
time = time.replace(tzinfo=LOCAL_TIMEZONE)
except ValueError:
time = None
return time
def parse_time(text: str) -> datetime.datetime:
def parse_time(text: str, timezone = LOCAL_TIMEZONE) -> datetime.datetime:
text = text.replace('\n', ' ')
text = text.strip()
@ -62,12 +60,12 @@ def parse_time(text: str) -> datetime.datetime:
raise RuntimeError(msg)
if time.tzinfo is None:
time = time.replace(tzinfo=LOCAL_TIMEZONE)
time = time.replace(tzinfo=timezone)
if time.tzinfo is None:
msg = 'Could not parse timezone: ' + text
raise RuntimeError(msg)
return time
return time.astimezone(datetime.UTC)
def parse_date(text: str) -> datetime.date:

View File

@ -19,8 +19,9 @@ def csv_safe_value(v: Any) -> str:
if isinstance(v, urllib.parse.ParseResult):
return v.geturl()
if isinstance(v, datetime.datetime):
if v.tzinfo is None:
raise RuntimeError(v)
if v.tzinfo is None or v.tzinfo != datetime.UTC:
msg = f'Timezone must be UTC: {v}'
raise ValueError(msg)
return str(v)

View File

@ -4,27 +4,19 @@ import pytest
from obsidian_import import obsidian
EXAMPLES = [
obsidian.Event(
datetime.time(12, 0, 0),
datetime.time(12, 0, 0),
'Ate',
'Lunch',
'instantly',
),
obsidian.Event(
datetime.time(20, 0, 0),
datetime.time(22, 0, 0),
'Watched',
'Tom and Jerry',
'on the *Television*',
),
obsidian.Event(None, None, None, None, 'Took a walk'),
obsidian.Event(None, None, None, None, 'Watched [[Cyberpunk: Edgerunners]].'),
]
from .test_obsidian_vault import EXAMPLES, EXAMPLE_DATE, EXAMPLE_TIMEZONE
def test_parse_event_string():
formatted = '17:44 | Watched [[../../media/anime/Azumanga Daioh]]. Season 1 Episode 6: *Sports Fest*'
event = obsidian.parse_event_string(formatted, EXAMPLE_DATE, EXAMPLE_TIMEZONE)
assert event is not None
assert event.subject == 'Azumanga Daioh'
assert event.start_time is not None
@pytest.mark.parametrize('event', EXAMPLES)
def test_format_preserves_information(event: obsidian.Event):
formatted = obsidian.format_event_string(event)
assert obsidian.parse_event_string(formatted) == event
formatted = obsidian.format_event_string(event, EXAMPLE_TIMEZONE)
assert obsidian.parse_event_string(formatted, EXAMPLE_DATE,
EXAMPLE_TIMEZONE) == event

View File

@ -1,19 +1,24 @@
import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
from obsidian_import import obsidian
EXAMPLE_TIMEZONE = ZoneInfo('Europe/Copenhagen')
EXAMPLE_DATE = datetime.date(2020, 1, 1)
EXAMPLES = [
obsidian.Event(
datetime.time(12, 0, 0),
datetime.time(12, 0, 0),
datetime.datetime(2020, 1, 1, 12, 0, 0, tzinfo=EXAMPLE_TIMEZONE),
datetime.datetime(2020, 1, 1, 12, 0, 0, tzinfo=EXAMPLE_TIMEZONE),
'Ate',
'Lunch',
'instantly',
),
obsidian.Event(
datetime.time(20, 0, 0),
datetime.time(22, 0, 0),
datetime.datetime(2020, 1, 1, 20, 0, 0, tzinfo=EXAMPLE_TIMEZONE),
datetime.datetime(2020, 1, 1, 22, 0, 0, tzinfo=EXAMPLE_TIMEZONE),
'Watched',
'Tom and Jerry',
'on the *Television*',
@ -33,7 +38,7 @@ def test_write_internally():
vault.path_format = 'YYYY-MM-DD'
vault.template_file_path = Path('daily-template-file.md')
vault.add_events(datetime.date(2020, 1, 1), EXAMPLES)
vault.add_events(EXAMPLE_DATE, EXAMPLES)
assert len(vault.internal_file_text_cache) == 2
assert vault.internal_file_text_cache[