1
0

Ruff
Some checks failed
Run Python tests (through Pytest) / Test (push) Failing after 34s
Verify Python project can be installed, loaded and have version checked / Test (push) Successful in 29s

This commit is contained in:
Jon Michael Aanes 2024-10-23 21:30:23 +02:00
parent 3170d8e7a8
commit f82b7c8526
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
5 changed files with 105 additions and 48 deletions

View File

@ -5,15 +5,19 @@ Sub-module for importing time-based data into Obsidian.
import dataclasses import dataclasses
import datetime import datetime
from collections.abc import Iterator
from logging import getLogger from logging import getLogger
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from collections.abc import Iterator
from personal_data.csv_import import start_end, determine_possible_keys, load_csv_file from personal_data.activity import (
from personal_data.activity import (ActivitySample, Label, ActivitySample,
RealizedActivitySample, heuristically_realize_samples, merge_adjacent_samples Label,
RealizedActivitySample,
heuristically_realize_samples,
merge_adjacent_samples,
) )
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
from .obsidian import Event, ObsidianVault from .obsidian import Event, ObsidianVault
@ -22,6 +26,7 @@ logger = getLogger(__name__)
Row = dict[str, Any] Row = dict[str, Any]
Rows = list[Row] Rows = list[Row]
def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]: def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
assert len(rows) > 0 assert len(rows) > 0
@ -48,7 +53,6 @@ def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
del event_data del event_data
def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int: def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
num_updated = 0 num_updated = 0
for row in rows: for row in rows:
@ -102,6 +106,7 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
return num_updated return num_updated
def escape_for_obsidian_link(link: str) -> str: def escape_for_obsidian_link(link: str) -> str:
return link.replace(':', ' ').replace('/', ' ').replace(' ', ' ') return link.replace(':', ' ').replace('/', ' ').replace(' ', ' ')
@ -113,8 +118,12 @@ class EventContent:
comment: str comment: str
def import_activity_sample_csv(vault: ObsidianVault, rows: Rows, def import_activity_sample_csv(
content_mapper, group_category: str | None = None) -> int: vault: ObsidianVault,
rows: Rows,
content_mapper,
group_category: str | None = None,
) -> int:
samples = heuristically_realize_samples(list(iterate_samples_from_rows(rows))) samples = heuristically_realize_samples(list(iterate_samples_from_rows(rows)))
if group_category is not None: if group_category is not None:
@ -130,12 +139,19 @@ def import_activity_sample_csv(vault: ObsidianVault, rows: Rows,
def map_to_event(sample: RealizedActivitySample) -> Event: def map_to_event(sample: RealizedActivitySample) -> Event:
content = content_mapper(sample) content = content_mapper(sample)
expected_tz = datetime.timezone(datetime.timedelta(hours=2)) # TODO: Determine this in a more intelligent manner expected_tz = datetime.timezone(
return Event(sample.start_at.astimezone(expected_tz).replace(second=0,microsecond=0).time(), datetime.timedelta(hours=2),
sample.end_at.astimezone(expected_tz).replace(second=0,microsecond=0).time(), ) # TODO: Determine this in a more intelligent manner
verb=content.verb, return Event(
subject=escape_for_obsidian_link(content.subject), sample.start_at.astimezone(expected_tz)
comment=content.comment, .replace(second=0, microsecond=0)
.time(),
sample.end_at.astimezone(expected_tz)
.replace(second=0, microsecond=0)
.time(),
verb=content.verb,
subject=escape_for_obsidian_link(content.subject),
comment=content.comment,
) )
num_updated = 0 num_updated = 0
@ -150,13 +166,19 @@ def import_activity_sample_csv(vault: ObsidianVault, rows: Rows,
return num_updated return num_updated
def import_activity_sample_csv_from_file(vault: ObsidianVault, data_path: Path,
content_mapper, **kwargs) -> int: def import_activity_sample_csv_from_file(
vault: ObsidianVault,
data_path: Path,
content_mapper,
**kwargs,
) -> int:
rows = load_csv_file(data_path) rows = load_csv_file(data_path)
logger.info('Loaded CSV with %d lines (%s)', len(rows), data_path) logger.info('Loaded CSV with %d lines (%s)', len(rows), data_path)
num_updated = import_activity_sample_csv(vault, rows, content_mapper, **kwargs) num_updated = import_activity_sample_csv(vault, rows, content_mapper, **kwargs)
logger.info('Updated %d files', num_updated) logger.info('Updated %d files', num_updated)
def map_watched_series_content(sample: RealizedActivitySample) -> EventContent: def map_watched_series_content(sample: RealizedActivitySample) -> EventContent:
subject = sample.single_label_with_category('series.name') subject = sample.single_label_with_category('series.name')
comment = '{} Episode {}: *{}*'.format( comment = '{} Episode {}: *{}*'.format(
@ -165,29 +187,40 @@ def map_watched_series_content(sample: RealizedActivitySample) -> EventContent:
sample.single_label_with_category('episode.name'), sample.single_label_with_category('episode.name'),
) )
return EventContent( return EventContent(
verb='Watched', verb='Watched',
subject=subject, subject=subject,
comment=comment, comment=comment,
) )
def map_games_played_content(sample: RealizedActivitySample) -> EventContent: def map_games_played_content(sample: RealizedActivitySample) -> EventContent:
subject = sample.single_label_with_category('game.name') subject = sample.single_label_with_category('game.name')
comment = '' comment = ''
return EventContent( return EventContent(
verb='Played', verb='Played',
subject=subject, subject=subject,
comment=comment, comment=comment,
) )
def import_watched_series_csv_from_file(vault: ObsidianVault) -> int: def import_watched_series_csv_from_file(vault: ObsidianVault) -> int:
data_path = Path('output/show_episodes_watched.csv') data_path = Path('output/show_episodes_watched.csv')
return import_activity_sample_csv_from_file(vault, data_path, map_watched_series_content) return import_activity_sample_csv_from_file(
vault,
data_path,
map_watched_series_content,
)
def import_played_games_csv_from_file(vault: ObsidianVault) -> int: def import_played_games_csv_from_file(vault: ObsidianVault) -> int:
data_path = Path('output/games_played_playstation.csv') data_path = Path('output/games_played_playstation.csv')
return import_activity_sample_csv_from_file(vault, data_path, return import_activity_sample_csv_from_file(
map_games_played_content, vault,
group_category='game.name') data_path,
map_games_played_content,
group_category='game.name',
)
def import_data(obsidian_path: Path, dry_run=True): def import_data(obsidian_path: Path, dry_run=True):
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None) vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)

View File

@ -48,7 +48,7 @@ FILE_FORMAT = """
{blocks_post_events} {blocks_post_events}
""" """
MIDNIGHT = datetime.time(0,0,0) MIDNIGHT = datetime.time(0, 0, 0)
class ObsidianVault: class ObsidianVault:
@ -128,7 +128,7 @@ class ObsidianVault:
if contents.events == updated_events: if contents.events == updated_events:
return False return False
contents = dataclasses.replace(contents, events = updated_events) contents = dataclasses.replace(contents, events=updated_events)
if not self.read_only: if not self.read_only:
self._save_contents(date, contents) self._save_contents(date, contents)
return True return True
@ -148,7 +148,9 @@ class ObsidianVault:
ast = MARKDOWN_PARSER.parse(str(file_frontmatter)) ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
(pre_events, list_block_items, post_events) = find_events_list_block(ast) (pre_events, list_block_items, post_events) = find_events_list_block(ast)
events = frozenset(parse_event_string(list_item) for list_item in list_block_items) events = frozenset(
parse_event_string(list_item) for list_item in list_block_items
)
return FileContents(file_frontmatter.metadata, pre_events, events, post_events) return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
def _save_contents(self, date: datetime.date, contents: FileContents) -> None: def _save_contents(self, date: datetime.date, contents: FileContents) -> None:
@ -163,9 +165,7 @@ class ObsidianVault:
events = list(contents.events) events = list(contents.events)
events.sort() events.sort()
events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT) events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT)
block_events = '\n'.join( block_events = '\n'.join('- ' + format_event_string(e) for e in events)
'- ' + format_event_string(e) for e in events
)
text = FILE_FORMAT.format( text = FILE_FORMAT.format(
blocks_pre_events=blocks_pre_events, blocks_pre_events=blocks_pre_events,
blocks_post_events=blocks_post_events, blocks_post_events=blocks_post_events,
@ -254,14 +254,26 @@ RE_TIME_FORMAT = RE_TIME + r'(?:\s*\-\s*' + RE_TIME + r')?'
def parse_event_string(event_str: str) -> Event: def parse_event_string(event_str: str) -> Event:
if m := re.match( if m := re.match(
r'^\s*' + RE_TIME_FORMAT + r'[ :\|-]*'+RE_VERB+r'\s+'+RE_LINK_MD+r'\.?\s*(.*)$', r'^\s*'
+ RE_TIME_FORMAT
+ r'[ :\|-]*'
+ RE_VERB
+ r'\s+'
+ RE_LINK_MD
+ r'\.?\s*(.*)$',
event_str, event_str,
): ):
start = datetime.time.fromisoformat(m.group(1)) start = datetime.time.fromisoformat(m.group(1))
end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start
return Event(start, end, m.group(3), m.group(4), m.group(5)) return Event(start, end, m.group(3), m.group(4), m.group(5))
if m := re.match( if m := re.match(
r'^\s*' + RE_TIME_FORMAT + r'[ :\|-]*'+RE_VERB+r'\s+'+RE_LINK_WIKI+r'\.?\s*(.*)$', r'^\s*'
+ RE_TIME_FORMAT
+ r'[ :\|-]*'
+ RE_VERB
+ r'\s+'
+ RE_LINK_WIKI
+ r'\.?\s*(.*)$',
event_str, event_str,
): ):
start = datetime.time.fromisoformat(m.group(1)) start = datetime.time.fromisoformat(m.group(1))

View File

@ -54,7 +54,7 @@ def heuristically_realize_samples(
* No samples overlap. * No samples overlap.
""" """
samples.sort(key = lambda x: x.end_at) samples.sort(key=lambda x: x.end_at)
previous_sample_end = None previous_sample_end = None
for sample in samples: for sample in samples:
@ -91,12 +91,14 @@ def mergable_labels(a: Sequence[Label], b: Sequence[Label]) -> Sequence[Label]:
def merge_adjacent_samples( def merge_adjacent_samples(
samples: list[RealizedActivitySample], group_category: str, samples: list[RealizedActivitySample],
group_category: str,
) -> list[RealizedActivitySample]: ) -> list[RealizedActivitySample]:
max_interval_between_samples = datetime.timedelta(minutes=5) max_interval_between_samples = datetime.timedelta(minutes=5)
def can_merge( def can_merge(
before: RealizedActivitySample, after: RealizedActivitySample, before: RealizedActivitySample,
after: RealizedActivitySample,
) -> bool: ) -> bool:
if before.single_label_with_category( if before.single_label_with_category(
group_category, group_category,

View File

@ -1,7 +1,7 @@
import dataclasses import dataclasses
import datetime
import logging import logging
import re import re
import datetime
from collections.abc import Iterator from collections.abc import Iterator
import bs4 import bs4
@ -67,18 +67,17 @@ class PsnProfilesScraper(Scraper):
) )
requests_util.setup_limiter( requests_util.setup_limiter(
self.session, self.session,
URL_API_ROOT+'/trophies/', URL_API_ROOT + '/trophies/',
expire_after=datetime.timedelta(days=14), expire_after=datetime.timedelta(days=14),
) )
def _scrape_games_overview(self) -> Iterator[dict]: def _scrape_games_overview(self) -> Iterator[dict]:
for page_num in range(1, 1000): for page_num in range(1, 1000):
logger.info('Getting Overview (page %d)', page_num) logger.info('Getting Overview (page %d)', page_num)
url = URL_GAMES_OVERVIEW.format(psn_id=secrets.PLAYSTATION_PSN_ID) url = URL_GAMES_OVERVIEW.format(psn_id=secrets.PLAYSTATION_PSN_ID)
response = self.session.get(url, params={'page': page_num}) response = self.session.get(url, params={'page': page_num})
if 'page' not in response.url: if 'page' not in response.url:
msg = 'Configuration error? psnprofiles.com made an redirection. This is possibly because your profile name wasn\'t exactly as expected. Please check it' msg = "Configuration error? psnprofiles.com made an redirection. This is possibly because your profile name wasn't exactly as expected. Please check it"
raise RuntimeError(msg) raise RuntimeError(msg)
response.raise_for_status() response.raise_for_status()
soup = bs4.BeautifulSoup(response.text, 'lxml') soup = bs4.BeautifulSoup(response.text, 'lxml')

View File

@ -1,19 +1,30 @@
import datetime import datetime
import pytest import pytest
from obsidian_import import obsidian from obsidian_import import obsidian
EXAMPLES = [ EXAMPLES = [
obsidian.Event(datetime.time(12, 0, 0), datetime.time(12, 0, 0), "Ate", obsidian.Event(
"Lunch", "instantly"), datetime.time(12, 0, 0),
obsidian.Event(datetime.time(20, 0, 0), datetime.time(22, 0, 0), datetime.time(12, 0, 0),
"Watched", "Tom and Jerry", "on the *Television*"), 'Ate',
obsidian.Event(None, None, None, None, "Took a walk"), 'Lunch',
obsidian.Event(None, None, None, None, "Watched [[Cyberpunk: Edgerunners]]."), 'instantly',
),
obsidian.Event(
datetime.time(20, 0, 0),
datetime.time(22, 0, 0),
'Watched',
'Tom and Jerry',
'on the *Television*',
),
obsidian.Event(None, None, None, None, 'Took a walk'),
obsidian.Event(None, None, None, None, 'Watched [[Cyberpunk: Edgerunners]].'),
] ]
@pytest.mark.parametrize("event", EXAMPLES)
@pytest.mark.parametrize('event', EXAMPLES)
def test_format_preserves_information(event: obsidian.Event): def test_format_preserves_information(event: obsidian.Event):
formatted = obsidian.format_event_string(event) formatted = obsidian.format_event_string(event)
assert obsidian.parse_event_string(formatted) == event assert obsidian.parse_event_string(formatted) == event