1
0

Compare commits

..

No commits in common. "b67089f9111991ca956f0cbc2e6c7d975cee3bd9" and "ce89103c325ae5e7763c61f804bf11062c5e3940" have entirely different histories.

7 changed files with 25 additions and 57 deletions

View File

@ -213,7 +213,7 @@ def import_watched_series_csv_from_file(vault: ObsidianVault) -> int:
def import_played_games_csv_from_file(vault: ObsidianVault) -> int: def import_played_games_csv_from_file(vault: ObsidianVault) -> int:
data_path = Path('output/games_played.csv') data_path = Path('output/games_played_playstation.csv')
return import_activity_sample_csv_from_file( return import_activity_sample_csv_from_file(
vault, vault,
data_path, data_path,

View File

@ -18,13 +18,12 @@ URL_PROFILE_MOUNTS = (
'https://eu.finalfantasyxiv.com/lodestone/character/{character_id}/mount/' 'https://eu.finalfantasyxiv.com/lodestone/character/{character_id}/mount/'
) )
FFXIV_ARR_NAME = 'Final Fantasy XIV: A Realm Reborn' FORMAT_DATE_HEADER = '%d/%m/%YYYY'
FFXIV_ARR_RELEASE_DATE = datetime.date(2013,8,27)
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class LodestoneAchievement(Scraper): class LodestoneAchievementScraper(Scraper):
dataset_name = 'games_played' dataset_name = 'games_played_playstation'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
def scrape(self): def scrape(self):
@ -68,8 +67,7 @@ class LodestoneAchievement(Scraper):
trophy_icon = trophy_icon.src trophy_icon = trophy_icon.src
yield { yield {
'game.name': FFXIV_ARR_NAME, 'game.name': 'Final Fantasy XIV: A Realm Reborn',
'game.release_date': FFXIV_ARR_RELEASE_DATE,
'me.last_played_time': time_acquired, 'me.last_played_time': time_acquired,
# Trophy Data # Trophy Data
'trophy.name': trophy_name, 'trophy.name': trophy_name,

View File

@ -43,7 +43,7 @@ def iterate_watched_episodes_of_series(client, series_id: str):
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class JellyfinWatchHistory(Scraper): class JellyfinWatchHistoryScraper(Scraper):
dataset_name = 'show_episodes_watched' dataset_name = 'show_episodes_watched'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS

View File

@ -1,5 +1,4 @@
import dataclasses import dataclasses
import datetime
import logging import logging
import re import re
from collections.abc import Iterator from collections.abc import Iterator
@ -31,10 +30,8 @@ MAX_NUMBER_GAMES_TO_PARSE = 10000
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class PsnProfiles(Scraper): class PsnProfilesScraper(Scraper):
"""Downloads all trophies for the given user.""" dataset_name = 'games_played_playstation'
dataset_name = 'games_played'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
@staticmethod @staticmethod
@ -127,16 +124,6 @@ class PsnProfiles(Scraper):
d['me.last_played_time'] = time_played d['me.last_played_time'] = time_played
yield d yield d
def _parse_game_release_date(self, soup: bs4.BeautifulSoup) -> datetime.date:
table_rows = soup.select('table.gameInfo tr')
for row in table_rows:
cells = row.select('td')
if cells[0].get_text() in {'Release', 'Releases'}:
text = cells[1].get_text()
dates = re.findall(r'\w+\s+\d+,\s+\d{4}', text)
return min(parse_util.parse_date(date) for date in dates)
assert False, 'Could not find release date'
def _scrape_game_trophies( def _scrape_game_trophies(
self, self,
psnprofiles_id: int, psnprofiles_id: int,
@ -156,14 +143,8 @@ class PsnProfiles(Scraper):
# Parse data # Parse data
soup = bs4.BeautifulSoup(response.content, 'lxml') soup = bs4.BeautifulSoup(response.content, 'lxml')
# Normalize before parsing trophies
soup = personal_data.html_util.normalize_soup_slightly(soup, classes=False) soup = personal_data.html_util.normalize_soup_slightly(soup, classes=False)
# Parse release year
game_release_date = self._parse_game_release_date(soup)
assert game_release_date
# Remove redundant # Remove redundant
for redundant in soup.select('.wide-ad'): for redundant in soup.select('.wide-ad'):
redundant.extract() redundant.extract()
@ -193,13 +174,11 @@ class PsnProfiles(Scraper):
yield { yield {
'game.name': game_name, 'game.name': game_name,
'game.release_date': game_release_date,
'me.last_played_time': gotten_at, 'me.last_played_time': gotten_at,
# Trophy Data # Trophy Data
'trophy.name': trophy_name, 'trophy.name': trophy_name,
'trophy.desc': trophy_desc, 'trophy.desc': trophy_desc,
'trophy.icon': trophy_icon, 'trophy.icon': trophy_icon,
# Ids
'psnprofiles.game_id': psnprofiles_id, 'psnprofiles.game_id': psnprofiles_id,
} }

View File

@ -21,20 +21,16 @@ FORMAT_DATE_HEADER = '%d/%m/%YYYY'
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class SteamAchievement(Scraper): class SteamAchievementScraper(Scraper):
"""Downloads all trophies for the given user.""" dataset_name = 'games_played_TODO'
dataset_name = 'games_played'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
def scrape(self) -> Iterator[dict[str, Any]]: def scrape(self) -> Iterator[dict[str, Any]]:
username: str = secrets.STEAM_USERNAME username = secrets.STEAM_USERNAME
appids = list(self._determine_appids_from_recent_activity(username)) for appid in self.determine_appids_from_recent_activity(username):
logger.info('Found %d Steam Apps', len(appids)) yield from self.scrape_app(username, appid)
for appid in appids:
yield from self._scrape_app_achievements(username, appid)
def _determine_appids_from_recent_activity(self, username: str) -> Iterator[int]: def determine_appids_from_recent_activity(self, username: str) -> Iterator[int]:
url = URL_USER_RECENT_ACTIVITY.format( url = URL_USER_RECENT_ACTIVITY.format(
username=username, username=username,
) )
@ -51,9 +47,7 @@ class SteamAchievement(Scraper):
appid = int(href.split('/')[-1]) appid = int(href.split('/')[-1])
yield appid yield appid
def _scrape_app_achievements( def scrape_app(self, username: str, appid: int) -> Iterator[dict[str, Any]]:
self, username: str, appid: int,
) -> Iterator[dict[str, Any]]:
url = URL_GAME_ACHIVEMENTS.format( url = URL_GAME_ACHIVEMENTS.format(
username=username, username=username,
appid=appid, appid=appid,
@ -61,6 +55,8 @@ class SteamAchievement(Scraper):
response = self.session.get(url) response = self.session.get(url)
response.raise_for_status() response.raise_for_status()
NOW = parse_util.parse_response_datetime(response)
# Parse data # Parse data
soup = bs4.BeautifulSoup(response.content, 'lxml') soup = bs4.BeautifulSoup(response.content, 'lxml')
@ -77,7 +73,7 @@ class SteamAchievement(Scraper):
for entry in soup.select('.achieveRow'): for entry in soup.select('.achieveRow'):
trophy_name: str = entry.select_one('h3').get_text() trophy_name: str = entry.select_one('h3').get_text()
trophy_desc: str = entry.select_one('h5').get_text() trophy_desc: str = entry.select_one('h5').get_text()
trophy_icon: str = entry.select_one('img')['src'] trophy_icon: str = entry.select_one('img').src
time_acquired_html: str = entry.select_one('.achieveUnlockTime') time_acquired_html: str = entry.select_one('.achieveUnlockTime')
if time_acquired_html is None: if time_acquired_html is None:
@ -89,14 +85,11 @@ class SteamAchievement(Scraper):
yield { yield {
'game.name': game_name, 'game.name': game_name,
#'game.release_date': None,
'me.last_played_time': time_acquired, 'me.last_played_time': time_acquired,
# Trophy Data # Trophy Data
'trophy.name': trophy_name, 'trophy.name': trophy_name,
'trophy.desc': trophy_desc, 'trophy.desc': trophy_desc,
'trophy.icon': trophy_icon, 'trophy.icon': trophy_icon,
# Ids
'steam.appid': appid,
} }
del entry, time_acquired del entry, time_acquired

View File

@ -41,7 +41,7 @@ def load_credentials() -> CredentialsType:
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class WithingsActivity(Scraper): class WithingsActivityScraper(Scraper):
dataset_name = 'withings_activity' dataset_name = 'withings_activity'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS

View File

@ -19,6 +19,8 @@ DATETIME_UNITS = {
FORMAT_DATE_HEADER = '%a, %d %b %Y %H:%M:%S GMT' FORMAT_DATE_HEADER = '%a, %d %b %Y %H:%M:%S GMT'
FORMAT_DAY_MONTH_YEAR = '%d %B %Y'
def parse_duration(text: str) -> datetime.timedelta: def parse_duration(text: str) -> datetime.timedelta:
(num, unit) = text.split(' ') (num, unit) = text.split(' ')
@ -67,11 +69,7 @@ def parse_time(text: str) -> datetime.datetime:
def parse_date(text: str) -> datetime.date: def parse_date(text: str) -> datetime.date:
text = text.strip() return datetime.datetime.strptime(
if dt := try_parse(text, '%d %B %Y'): text.strip(),
return dt.date() FORMAT_DAY_MONTH_YEAR,
if dt := try_parse(text, '%b %d, %Y'): ).date()
return dt.date()
if dt := try_parse(text, '%B %d, %Y'):
return dt.date()
assert False, text