1
0

Compare commits

..

No commits in common. "f82b7c85267434ff9a26b17afdc282d7c8bf196c" and "5255206cf4af194e92b56b31f81b47f9bbfd323f" have entirely different histories.

6 changed files with 52 additions and 148 deletions

View File

@ -5,19 +5,15 @@ Sub-module for importing time-based data into Obsidian.
import dataclasses
import datetime
from collections.abc import Iterator
from logging import getLogger
from pathlib import Path
from typing import Any
from collections.abc import Iterator
from personal_data.activity import (
ActivitySample,
Label,
RealizedActivitySample,
heuristically_realize_samples,
merge_adjacent_samples,
from personal_data.csv_import import start_end, determine_possible_keys, load_csv_file
from personal_data.activity import (ActivitySample, Label,
RealizedActivitySample, heuristically_realize_samples, merge_adjacent_samples
)
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
from .obsidian import Event, ObsidianVault
@ -26,7 +22,6 @@ logger = getLogger(__name__)
Row = dict[str, Any]
Rows = list[Row]
def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
assert len(rows) > 0
@ -53,6 +48,7 @@ def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
del event_data
def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
num_updated = 0
for row in rows:
@ -106,7 +102,6 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
return num_updated
def escape_for_obsidian_link(link: str) -> str:
return link.replace(':', ' ').replace('/', ' ').replace(' ', ' ')
@ -118,12 +113,8 @@ class EventContent:
comment: str
def import_activity_sample_csv(
vault: ObsidianVault,
rows: Rows,
content_mapper,
group_category: str | None = None,
) -> int:
def import_activity_sample_csv(vault: ObsidianVault, rows: Rows,
content_mapper, group_category: str | None = None) -> int:
samples = heuristically_realize_samples(list(iterate_samples_from_rows(rows)))
if group_category is not None:
@ -139,16 +130,9 @@ def import_activity_sample_csv(
def map_to_event(sample: RealizedActivitySample) -> Event:
content = content_mapper(sample)
expected_tz = datetime.timezone(
datetime.timedelta(hours=2),
) # TODO: Determine this in a more intelligent manner
return Event(
sample.start_at.astimezone(expected_tz)
.replace(second=0, microsecond=0)
.time(),
sample.end_at.astimezone(expected_tz)
.replace(second=0, microsecond=0)
.time(),
expected_tz = datetime.timezone(datetime.timedelta(hours=2)) # TODO: Determine this in a more intelligent manner
return Event(sample.start_at.astimezone(expected_tz).replace(second=0,microsecond=0).time(),
sample.end_at.astimezone(expected_tz).replace(second=0,microsecond=0).time(),
verb=content.verb,
subject=escape_for_obsidian_link(content.subject),
comment=content.comment,
@ -166,19 +150,13 @@ def import_activity_sample_csv(
return num_updated
def import_activity_sample_csv_from_file(
vault: ObsidianVault,
data_path: Path,
content_mapper,
**kwargs,
) -> int:
def import_activity_sample_csv_from_file(vault: ObsidianVault, data_path: Path,
content_mapper, **kwargs) -> int:
rows = load_csv_file(data_path)
logger.info('Loaded CSV with %d lines (%s)', len(rows), data_path)
num_updated = import_activity_sample_csv(vault, rows, content_mapper, **kwargs)
logger.info('Updated %d files', num_updated)
def map_watched_series_content(sample: RealizedActivitySample) -> EventContent:
subject = sample.single_label_with_category('series.name')
comment = '{} Episode {}: *{}*'.format(
@ -192,7 +170,6 @@ def map_watched_series_content(sample: RealizedActivitySample) -> EventContent:
comment=comment,
)
def map_games_played_content(sample: RealizedActivitySample) -> EventContent:
subject = sample.single_label_with_category('game.name')
comment = ''
@ -202,25 +179,15 @@ def map_games_played_content(sample: RealizedActivitySample) -> EventContent:
comment=comment,
)
def import_watched_series_csv_from_file(vault: ObsidianVault) -> int:
data_path = Path('output/show_episodes_watched.csv')
return import_activity_sample_csv_from_file(
vault,
data_path,
map_watched_series_content,
)
return import_activity_sample_csv_from_file(vault, data_path, map_watched_series_content)
def import_played_games_csv_from_file(vault: ObsidianVault) -> int:
data_path = Path('output/games_played_playstation.csv')
return import_activity_sample_csv_from_file(
vault,
data_path,
return import_activity_sample_csv_from_file(vault, data_path,
map_games_played_content,
group_category='game.name',
)
group_category='game.name')
def import_data(obsidian_path: Path, dry_run=True):
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)

View File

@ -148,9 +148,7 @@ class ObsidianVault:
ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
(pre_events, list_block_items, post_events) = find_events_list_block(ast)
events = frozenset(
parse_event_string(list_item) for list_item in list_block_items
)
events = frozenset(parse_event_string(list_item) for list_item in list_block_items)
return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
def _save_contents(self, date: datetime.date, contents: FileContents) -> None:
@ -165,7 +163,9 @@ class ObsidianVault:
events = list(contents.events)
events.sort()
events.sort(key=lambda x: x.start_time or x.end_time or MIDNIGHT)
block_events = '\n'.join('- ' + format_event_string(e) for e in events)
block_events = '\n'.join(
'- ' + format_event_string(e) for e in events
)
text = FILE_FORMAT.format(
blocks_pre_events=blocks_pre_events,
blocks_post_events=blocks_post_events,
@ -254,26 +254,14 @@ RE_TIME_FORMAT = RE_TIME + r'(?:\s*\-\s*' + RE_TIME + r')?'
def parse_event_string(event_str: str) -> Event:
if m := re.match(
r'^\s*'
+ RE_TIME_FORMAT
+ r'[ :\|-]*'
+ RE_VERB
+ r'\s+'
+ RE_LINK_MD
+ r'\.?\s*(.*)$',
r'^\s*' + RE_TIME_FORMAT + r'[ :\|-]*'+RE_VERB+r'\s+'+RE_LINK_MD+r'\.?\s*(.*)$',
event_str,
):
start = datetime.time.fromisoformat(m.group(1))
end = datetime.time.fromisoformat(m.group(2)) if m.group(2) else start
return Event(start, end, m.group(3), m.group(4), m.group(5))
if m := re.match(
r'^\s*'
+ RE_TIME_FORMAT
+ r'[ :\|-]*'
+ RE_VERB
+ r'\s+'
+ RE_LINK_WIKI
+ r'\.?\s*(.*)$',
r'^\s*' + RE_TIME_FORMAT + r'[ :\|-]*'+RE_VERB+r'\s+'+RE_LINK_WIKI+r'\.?\s*(.*)$',
event_str,
):
start = datetime.time.fromisoformat(m.group(1))

View File

@ -91,14 +91,12 @@ def mergable_labels(a: Sequence[Label], b: Sequence[Label]) -> Sequence[Label]:
def merge_adjacent_samples(
samples: list[RealizedActivitySample],
group_category: str,
samples: list[RealizedActivitySample], group_category: str,
) -> list[RealizedActivitySample]:
max_interval_between_samples = datetime.timedelta(minutes=5)
def can_merge(
before: RealizedActivitySample,
after: RealizedActivitySample,
before: RealizedActivitySample, after: RealizedActivitySample,
) -> bool:
if before.single_label_with_category(
group_category,

View File

@ -1,5 +1,4 @@
import dataclasses
import datetime
import logging
import re
from collections.abc import Iterator
@ -18,7 +17,6 @@ logger = logging.getLogger(__name__)
URL_API_ROOT = 'https://psnprofiles.com/'
URL_PROFILE = URL_API_ROOT + '{psn_id}'
URL_USER_GAME_TROPHIES = URL_API_ROOT + 'trophies/{game_id}/{psn_id}'
URL_GAMES_OVERVIEW = URL_API_ROOT + '{psn_id}'
def game_psnprofiles_id_from_url(relative_url: str) -> int:
@ -44,8 +42,6 @@ class PsnProfilesScraper(Scraper):
games_rows = list(self._scrape_games_overview())
games_ids = {row['psnprofiles.game_id']: row['game.name'] for row in games_rows}
logger.info('Found %d games from overview', len(games_rows))
SCRAPE_FROM_OVERVIEW = False
if SCRAPE_FROM_OVERVIEW:
yield from games_rows
@ -63,49 +59,24 @@ class PsnProfilesScraper(Scraper):
self.session,
URL_API_ROOT,
per_minute=5,
expire_after=datetime.timedelta(hours=1),
)
requests_util.setup_limiter(
self.session,
URL_API_ROOT + '/trophies/',
expire_after=datetime.timedelta(days=14),
)
def _scrape_games_overview(self) -> Iterator[dict]:
for page_num in range(1, 1000):
logger.info('Getting Overview (page %d)', page_num)
url = URL_GAMES_OVERVIEW.format(psn_id=secrets.PLAYSTATION_PSN_ID)
response = self.session.get(url, params={'page': page_num})
if 'page' not in response.url:
msg = "Configuration error? psnprofiles.com made an redirection. This is possibly because your profile name wasn't exactly as expected. Please check it"
raise RuntimeError(msg)
response.raise_for_status()
soup = bs4.BeautifulSoup(response.text, 'lxml')
soup = personal_data.html_util.normalize_soup_slightly(soup, classes=False)
games_on_page = list(self._iterate_games_from_games_table(soup))
yield from games_on_page
if len(games_on_page) == 0:
return
def _scrape_games_overview_old(self) -> Iterator[dict]:
# Request to get overview
logger.info('Getting Overview')
url = URL_PROFILE.format(psn_id=secrets.PLAYSTATION_PSN_ID)
response = self.session.get(url)
response.raise_for_status()
now = parse_util.parse_response_datetime(response)
NOW = parse_util.parse_response_datetime(response)
# Parse data
soup = bs4.BeautifulSoup(response.content, 'lxml')
soup = personal_data.html_util.normalize_soup_slightly(soup, classes=False)
yield from self._iterate_games_from_recent_tropies(soup, now)
yield from self._iterate_games_from_games_table(soup)
def _iterate_games_from_recent_tropies(self, soup, now) -> Iterator[dict]:
# Recent trophies.
soup_recent_tropies = soup.select('ul#recent-trophies > li')
assert len(soup_recent_tropies) > 0
assert len(soup_recent_tropies) > 0, url
for row in soup_recent_tropies:
cells = row.select_one('.info .box td').find_all('div')
@ -120,7 +91,7 @@ class PsnProfilesScraper(Scraper):
cells[2].get_text().strip().removesuffix(' in').removesuffix(' ago')
)
gotten_at = parse_util.parse_duration(gotten_at)
time_acquired = now - gotten_at
time_acquired = NOW - gotten_at
yield {
'game.name': game_name,
@ -134,15 +105,10 @@ class PsnProfilesScraper(Scraper):
del row, cells, time_acquired
def _iterate_games_from_games_table(self, soup) -> Iterator[dict]:
# Games table
table_rows = soup.find(id='gamesTable').find_all('tr')
assert len(table_rows) > 0, url
if title := table_rows[0].h2:
if title.get_text().strip() == 'No games found':
return
for row in table_rows:
cells = row.find_all('td')

View File

@ -54,16 +54,12 @@ def get_session(
ignore_cache: bool,
) -> requests.Session:
assert isinstance(with_cfscrape, bool)
if cfscrape:
session_class = CachedCfScrape
if ignore_cache:
logger.warning('HTTP cache disabled')
return cfscrape.create_scraper()
else:
session_class = requests_cache.CachedSession
if ignore_cache:
logger.warning('HTTP cache disabled')
return requests.Session()
if cfscrape:
session_class = CachedCfScrape
session = session_class(
OUTPUT_PATH / 'web_cache',
cookies=cookiejar,

View File

@ -1,30 +1,19 @@
import datetime
import pytest
from obsidian_import import obsidian
EXAMPLES = [
obsidian.Event(
datetime.time(12, 0, 0),
datetime.time(12, 0, 0),
'Ate',
'Lunch',
'instantly',
),
obsidian.Event(
datetime.time(20, 0, 0),
datetime.time(22, 0, 0),
'Watched',
'Tom and Jerry',
'on the *Television*',
),
obsidian.Event(None, None, None, None, 'Took a walk'),
obsidian.Event(None, None, None, None, 'Watched [[Cyberpunk: Edgerunners]].'),
obsidian.Event(datetime.time(12, 0, 0), datetime.time(12, 0, 0), "Ate",
"Lunch", "instantly"),
obsidian.Event(datetime.time(20, 0, 0), datetime.time(22, 0, 0),
"Watched", "Tom and Jerry", "on the *Television*"),
obsidian.Event(None, None, None, None, "Took a walk"),
obsidian.Event(None, None, None, None, "Watched [[Cyberpunk: Edgerunners]]."),
]
@pytest.mark.parametrize('event', EXAMPLES)
@pytest.mark.parametrize("event", EXAMPLES)
def test_format_preserves_information(event: obsidian.Event):
formatted = obsidian.format_event_string(event)
assert obsidian.parse_event_string(formatted) == event