1
0

Compare commits

...

2 Commits

Author SHA1 Message Date
f82b7c8526
Ruff
Some checks failed
Run Python tests (through Pytest) / Test (push) Failing after 34s
Verify Python project can be installed, loaded and have version checked / Test (push) Successful in 29s
2024-10-23 21:30:23 +02:00
3170d8e7a8
PSN Profiles: Implemented pagination 2024-10-23 21:29:53 +02:00
6 changed files with 148 additions and 52 deletions

View File

@ -5,15 +5,19 @@ Sub-module for importing time-based data into Obsidian.
import dataclasses import dataclasses
import datetime import datetime
from collections.abc import Iterator
from logging import getLogger from logging import getLogger
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from collections.abc import Iterator
from personal_data.csv_import import start_end, determine_possible_keys, load_csv_file from personal_data.activity import (
from personal_data.activity import (ActivitySample, Label, ActivitySample,
RealizedActivitySample, heuristically_realize_samples, merge_adjacent_samples Label,
RealizedActivitySample,
heuristically_realize_samples,
merge_adjacent_samples,
) )
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
from .obsidian import Event, ObsidianVault from .obsidian import Event, ObsidianVault
@ -22,6 +26,7 @@ logger = getLogger(__name__)
Row = dict[str, Any] Row = dict[str, Any]
Rows = list[Row] Rows = list[Row]
def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]: def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
assert len(rows) > 0 assert len(rows) > 0
@ -48,7 +53,6 @@ def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
del event_data del event_data
def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int: def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
num_updated = 0 num_updated = 0
for row in rows: for row in rows:
@ -102,6 +106,7 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
return num_updated return num_updated
def escape_for_obsidian_link(link: str) -> str: def escape_for_obsidian_link(link: str) -> str:
return link.replace(':', ' ').replace('/', ' ').replace(' ', ' ') return link.replace(':', ' ').replace('/', ' ').replace(' ', ' ')
@ -113,8 +118,12 @@ class EventContent:
comment: str comment: str
def import_activity_sample_csv(vault: ObsidianVault, rows: Rows, def import_activity_sample_csv(
content_mapper, group_category: str | None = None) -> int: vault: ObsidianVault,
rows: Rows,
content_mapper,
group_category: str | None = None,
) -> int:
samples = heuristically_realize_samples(list(iterate_samples_from_rows(rows))) samples = heuristically_realize_samples(list(iterate_samples_from_rows(rows)))
if group_category is not None: if group_category is not None:
@ -130,9 +139,16 @@ def import_activity_sample_csv(vault: ObsidianVault, rows: Rows,
def map_to_event(sample: RealizedActivitySample) -> Event: def map_to_event(sample: RealizedActivitySample) -> Event:
content = content_mapper(sample) content = content_mapper(sample)
expected_tz = datetime.timezone(datetime.timedelta(hours=2)) # TODO: Determine this in a more intelligent manner expected_tz = datetime.timezone(
return Event(sample.start_at.astimezone(expected_tz).replace(second=0,microsecond=0).time(), datetime.timedelta(hours=2),
sample.end_at.astimezone(expected_tz).replace(second=0,microsecond=0).time(), ) # TODO: Determine this in a more intelligent manner
return Event(
sample.start_at.astimezone(expected_tz)
.replace(second=0, microsecond=0)
.time(),
sample.end_at.astimezone(expected_tz)
.replace(second=0, microsecond=0)
.time(),
verb=content.verb, verb=content.verb,
subject=escape_for_obsidian_link(content.subject), subject=escape_for_obsidian_link(content.subject),
comment=content.comment, comment=content.comment,
@ -150,13 +166,19 @@ def import_activity_sample_csv(vault: ObsidianVault, rows: Rows,
return num_updated return num_updated
def import_activity_sample_csv_from_file(vault: ObsidianVault, data_path: Path,
content_mapper, **kwargs) -> int: def import_activity_sample_csv_from_file(
vault: ObsidianVault,
data_path: Path,
content_mapper,
**kwargs,
) -> int:
rows = load_csv_file(data_path) rows = load_csv_file(data_path)
logger.info('Loaded CSV with %d lines (%s)', len(rows), data_path) logger.info('Loaded CSV with %d lines (%s)', len(rows), data_path)
num_updated = import_activity_sample_csv(vault, rows, content_mapper, **kwargs) num_updated = import_activity_sample_csv(vault, rows, content_mapper, **kwargs)
logger.info('Updated %d files', num_updated) logger.info('Updated %d files', num_updated)
def map_watched_series_content(sample: RealizedActivitySample) -> EventContent: def map_watched_series_content(sample: RealizedActivitySample) -> EventContent:
subject = sample.single_label_with_category('series.name') subject = sample.single_label_with_category('series.name')
comment = '{} Episode {}: *{}*'.format( comment = '{} Episode {}: *{}*'.format(
@ -170,6 +192,7 @@ def map_watched_series_content(sample: RealizedActivitySample) -> EventContent:
comment=comment, comment=comment,
) )
def map_games_played_content(sample: RealizedActivitySample) -> EventContent: def map_games_played_content(sample: RealizedActivitySample) -> EventContent:
subject = sample.single_label_with_category('game.name') subject = sample.single_label_with_category('game.name')
comment = '' comment = ''
@ -179,15 +202,25 @@ def map_games_played_content(sample: RealizedActivitySample) -> EventContent:
comment=comment, comment=comment,
) )
def import_watched_series_csv_from_file(vault: ObsidianVault) -> int: def import_watched_series_csv_from_file(vault: ObsidianVault) -> int:
data_path = Path('output/show_episodes_watched.csv') data_path = Path('output/show_episodes_watched.csv')
return import_activity_sample_csv_from_file(vault, data_path, map_watched_series_content) return import_activity_sample_csv_from_file(
vault,
data_path,
map_watched_series_content,
)
def import_played_games_csv_from_file(vault: ObsidianVault) -> int: def import_played_games_csv_from_file(vault: ObsidianVault) -> int:
data_path = Path('output/games_played_playstation.csv') data_path = Path('output/games_played_playstation.csv')
return import_activity_sample_csv_from_file(vault, data_path, return import_activity_sample_csv_from_file(
vault,
data_path,
map_games_played_content, map_games_played_content,
group_category='game.name') group_category='game.name',
)
def import_data(obsidian_path: Path, dry_run=True): def import_data(obsidian_path: Path, dry_run=True):
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None) vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)

View File

@ -48,7 +48,7 @@ FILE_FORMAT = """
{blocks_post_events} {blocks_post_events}
""" """
MIDNIGHT = datetime.time(0,0,0) MIDNIGHT = datetime.time(0, 0, 0)
class ObsidianVault: class ObsidianVault:
@ -128,7 +128,7 @@ class ObsidianVault:
if contents.events == updated_events: if contents.events == updated_events:
return False return False
contents = dataclasses.replace(contents, events = updated_events) contents = dataclasses.replace(contents, events=updated_events)
if not self.read_only: if not self.read_only:
self._save_contents(date, contents) self._save_contents(date, contents)
return True return True
@ -148,7 +148,9 @@ class ObsidianVault:
ast = MARKDOWN_PARSER.parse(str(file_frontmatter)) ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
(pre_events, list_block_items, post_events) = find_events_list_block(ast) (pre_events, list_block_items, post_events) = find_events_list_block(ast)
events = frozenset(parse_event_string(list_item) for list_item in list_block_items) events = frozenset(
parse_event_string(list_item) for list_item in list_block_items
)
return FileContents(file_frontmatter.metadata, pre_events, events, post_events) return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
def _save_contents(self, date: datetime.date, contents: FileContents) -> None: def _save_contents(self, date: datetime.date, contents: FileContents) -> None:
@ -163,9 +165,7 @@ class ObsidianVault:
events = list(contents.events) events = list(contents.events)
events.sort() events.sort()
events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT) events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT)
block_events = '\n'.join( block_events = '\n'.join('- ' + format_event_string(e) for e in events)
'- ' + format_event_string(e) for e in events
)
text = FILE_FORMAT.format( text = FILE_FORMAT.format(
blocks_pre_events=blocks_pre_events, blocks_pre_events=blocks_pre_events,
blocks_post_events=blocks_post_events, blocks_post_events=blocks_post_events,
@ -254,14 +254,26 @@ RE_TIME_FORMAT = RE_TIME + r'(?:\s*\-\s*' + RE_TIME + r')?'
def parse_event_string(event_str: str) -> Event: def parse_event_string(event_str: str) -> Event:
if m := re.match( if m := re.match(
r'^\s*' + RE_TIME_FORMAT + r'[ :\|-]*'+RE_VERB+r'\s+'+RE_LINK_MD+r'\.?\s*(.*)$', r'^\s*'
+ RE_TIME_FORMAT
+ r'[ :\|-]*'
+ RE_VERB
+ r'\s+'
+ RE_LINK_MD
+ r'\.?\s*(.*)$',
event_str, event_str,
): ):
start = datetime.time.fromisoformat(m.group(1)) start = datetime.time.fromisoformat(m.group(1))
end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start
return Event(start, end, m.group(3), m.group(4), m.group(5)) return Event(start, end, m.group(3), m.group(4), m.group(5))
if m := re.match( if m := re.match(
r'^\s*' + RE_TIME_FORMAT + r'[ :\|-]*'+RE_VERB+r'\s+'+RE_LINK_WIKI+r'\.?\s*(.*)$', r'^\s*'
+ RE_TIME_FORMAT
+ r'[ :\|-]*'
+ RE_VERB
+ r'\s+'
+ RE_LINK_WIKI
+ r'\.?\s*(.*)$',
event_str, event_str,
): ):
start = datetime.time.fromisoformat(m.group(1)) start = datetime.time.fromisoformat(m.group(1))

View File

@ -54,7 +54,7 @@ def heuristically_realize_samples(
* No samples overlap. * No samples overlap.
""" """
samples.sort(key = lambda x: x.end_at) samples.sort(key=lambda x: x.end_at)
previous_sample_end = None previous_sample_end = None
for sample in samples: for sample in samples:
@ -91,12 +91,14 @@ def mergable_labels(a: Sequence[Label], b: Sequence[Label]) -> Sequence[Label]:
def merge_adjacent_samples( def merge_adjacent_samples(
samples: list[RealizedActivitySample], group_category: str, samples: list[RealizedActivitySample],
group_category: str,
) -> list[RealizedActivitySample]: ) -> list[RealizedActivitySample]:
max_interval_between_samples = datetime.timedelta(minutes=5) max_interval_between_samples = datetime.timedelta(minutes=5)
def can_merge( def can_merge(
before: RealizedActivitySample, after: RealizedActivitySample, before: RealizedActivitySample,
after: RealizedActivitySample,
) -> bool: ) -> bool:
if before.single_label_with_category( if before.single_label_with_category(
group_category, group_category,

View File

@ -1,4 +1,5 @@
import dataclasses import dataclasses
import datetime
import logging import logging
import re import re
from collections.abc import Iterator from collections.abc import Iterator
@ -17,6 +18,7 @@ logger = logging.getLogger(__name__)
URL_API_ROOT = 'https://psnprofiles.com/' URL_API_ROOT = 'https://psnprofiles.com/'
URL_PROFILE = URL_API_ROOT + '{psn_id}' URL_PROFILE = URL_API_ROOT + '{psn_id}'
URL_USER_GAME_TROPHIES = URL_API_ROOT + 'trophies/{game_id}/{psn_id}' URL_USER_GAME_TROPHIES = URL_API_ROOT + 'trophies/{game_id}/{psn_id}'
URL_GAMES_OVERVIEW = URL_API_ROOT + '{psn_id}'
def game_psnprofiles_id_from_url(relative_url: str) -> int: def game_psnprofiles_id_from_url(relative_url: str) -> int:
@ -42,6 +44,8 @@ class PsnProfilesScraper(Scraper):
games_rows = list(self._scrape_games_overview()) games_rows = list(self._scrape_games_overview())
games_ids = {row['psnprofiles.game_id']: row['game.name'] for row in games_rows} games_ids = {row['psnprofiles.game_id']: row['game.name'] for row in games_rows}
logger.info('Found %d games from overview', len(games_rows))
SCRAPE_FROM_OVERVIEW = False SCRAPE_FROM_OVERVIEW = False
if SCRAPE_FROM_OVERVIEW: if SCRAPE_FROM_OVERVIEW:
yield from games_rows yield from games_rows
@ -59,24 +63,49 @@ class PsnProfilesScraper(Scraper):
self.session, self.session,
URL_API_ROOT, URL_API_ROOT,
per_minute=5, per_minute=5,
expire_after=datetime.timedelta(hours=1),
)
requests_util.setup_limiter(
self.session,
URL_API_ROOT + '/trophies/',
expire_after=datetime.timedelta(days=14),
) )
def _scrape_games_overview(self) -> Iterator[dict]: def _scrape_games_overview(self) -> Iterator[dict]:
for page_num in range(1, 1000):
logger.info('Getting Overview (page %d)', page_num)
url = URL_GAMES_OVERVIEW.format(psn_id=secrets.PLAYSTATION_PSN_ID)
response = self.session.get(url, params={'page': page_num})
if 'page' not in response.url:
msg = "Configuration error? psnprofiles.com made an redirection. This is possibly because your profile name wasn't exactly as expected. Please check it"
raise RuntimeError(msg)
response.raise_for_status()
soup = bs4.BeautifulSoup(response.text, 'lxml')
soup = personal_data.html_util.normalize_soup_slightly(soup, classes=False)
games_on_page = list(self._iterate_games_from_games_table(soup))
yield from games_on_page
if len(games_on_page) == 0:
return
def _scrape_games_overview_old(self) -> Iterator[dict]:
# Request to get overview # Request to get overview
logger.info('Getting Overview') logger.info('Getting Overview')
url = URL_PROFILE.format(psn_id=secrets.PLAYSTATION_PSN_ID) url = URL_PROFILE.format(psn_id=secrets.PLAYSTATION_PSN_ID)
response = self.session.get(url) response = self.session.get(url)
response.raise_for_status() response.raise_for_status()
NOW = parse_util.parse_response_datetime(response) now = parse_util.parse_response_datetime(response)
# Parse data # Parse data
soup = bs4.BeautifulSoup(response.content, 'lxml') soup = bs4.BeautifulSoup(response.content, 'lxml')
soup = personal_data.html_util.normalize_soup_slightly(soup, classes=False) soup = personal_data.html_util.normalize_soup_slightly(soup, classes=False)
# Recent trophies. yield from self._iterate_games_from_recent_tropies(soup, now)
yield from self._iterate_games_from_games_table(soup)
def _iterate_games_from_recent_tropies(self, soup, now) -> Iterator[dict]:
soup_recent_tropies = soup.select('ul#recent-trophies > li') soup_recent_tropies = soup.select('ul#recent-trophies > li')
assert len(soup_recent_tropies) > 0, url assert len(soup_recent_tropies) > 0
for row in soup_recent_tropies: for row in soup_recent_tropies:
cells = row.select_one('.info .box td').find_all('div') cells = row.select_one('.info .box td').find_all('div')
@ -91,7 +120,7 @@ class PsnProfilesScraper(Scraper):
cells[2].get_text().strip().removesuffix(' in').removesuffix(' ago') cells[2].get_text().strip().removesuffix(' in').removesuffix(' ago')
) )
gotten_at = parse_util.parse_duration(gotten_at) gotten_at = parse_util.parse_duration(gotten_at)
time_acquired = NOW - gotten_at time_acquired = now - gotten_at
yield { yield {
'game.name': game_name, 'game.name': game_name,
@ -105,10 +134,15 @@ class PsnProfilesScraper(Scraper):
del row, cells, time_acquired del row, cells, time_acquired
def _iterate_games_from_games_table(self, soup) -> Iterator[dict]:
# Games table # Games table
table_rows = soup.find(id='gamesTable').find_all('tr') table_rows = soup.find(id='gamesTable').find_all('tr')
assert len(table_rows) > 0, url assert len(table_rows) > 0, url
if title := table_rows[0].h2:
if title.get_text().strip() == 'No games found':
return
for row in table_rows: for row in table_rows:
cells = row.find_all('td') cells = row.find_all('td')

View File

@ -54,12 +54,16 @@ def get_session(
ignore_cache: bool, ignore_cache: bool,
) -> requests.Session: ) -> requests.Session:
assert isinstance(with_cfscrape, bool) assert isinstance(with_cfscrape, bool)
if cfscrape:
session_class = CachedCfScrape
if ignore_cache:
logger.warning('HTTP cache disabled')
return cfscrape.create_scraper()
else:
session_class = requests_cache.CachedSession session_class = requests_cache.CachedSession
if ignore_cache: if ignore_cache:
logger.warning('HTTP cache disabled') logger.warning('HTTP cache disabled')
return requests.Session() return requests.Session()
if cfscrape:
session_class = CachedCfScrape
session = session_class( session = session_class(
OUTPUT_PATH / 'web_cache', OUTPUT_PATH / 'web_cache',
cookies=cookiejar, cookies=cookiejar,

View File

@ -1,19 +1,30 @@
import datetime import datetime
import pytest import pytest
from obsidian_import import obsidian from obsidian_import import obsidian
EXAMPLES = [ EXAMPLES = [
obsidian.Event(datetime.time(12, 0, 0), datetime.time(12, 0, 0), "Ate", obsidian.Event(
"Lunch", "instantly"), datetime.time(12, 0, 0),
obsidian.Event(datetime.time(20, 0, 0), datetime.time(22, 0, 0), datetime.time(12, 0, 0),
"Watched", "Tom and Jerry", "on the *Television*"), 'Ate',
obsidian.Event(None, None, None, None, "Took a walk"), 'Lunch',
obsidian.Event(None, None, None, None, "Watched [[Cyberpunk: Edgerunners]]."), 'instantly',
),
obsidian.Event(
datetime.time(20, 0, 0),
datetime.time(22, 0, 0),
'Watched',
'Tom and Jerry',
'on the *Television*',
),
obsidian.Event(None, None, None, None, 'Took a walk'),
obsidian.Event(None, None, None, None, 'Watched [[Cyberpunk: Edgerunners]].'),
] ]
@pytest.mark.parametrize("event", EXAMPLES)
@pytest.mark.parametrize('event', EXAMPLES)
def test_format_preserves_information(event: obsidian.Event): def test_format_preserves_information(event: obsidian.Event):
formatted = obsidian.format_event_string(event) formatted = obsidian.format_event_string(event)
assert obsidian.parse_event_string(formatted) == event assert obsidian.parse_event_string(formatted) == event