1
0

Compare commits

..

No commits in common. "b67089f9111991ca956f0cbc2e6c7d975cee3bd9" and "ce89103c325ae5e7763c61f804bf11062c5e3940" have entirely different histories.

7 changed files with 25 additions and 57 deletions

View File

@ -213,7 +213,7 @@ def import_watched_series_csv_from_file(vault: ObsidianVault) -> int:
def import_played_games_csv_from_file(vault: ObsidianVault) -> int:
data_path = Path('output/games_played.csv')
data_path = Path('output/games_played_playstation.csv')
return import_activity_sample_csv_from_file(
vault,
data_path,

View File

@ -18,13 +18,12 @@ URL_PROFILE_MOUNTS = (
'https://eu.finalfantasyxiv.com/lodestone/character/{character_id}/mount/'
)
FFXIV_ARR_NAME = 'Final Fantasy XIV: A Realm Reborn'
FFXIV_ARR_RELEASE_DATE = datetime.date(2013,8,27)
FORMAT_DATE_HEADER = '%d/%m/%YYYY'
@dataclasses.dataclass(frozen=True)
class LodestoneAchievement(Scraper):
dataset_name = 'games_played'
class LodestoneAchievementScraper(Scraper):
dataset_name = 'games_played_playstation'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
def scrape(self):
@ -68,8 +67,7 @@ class LodestoneAchievement(Scraper):
trophy_icon = trophy_icon.src
yield {
'game.name': FFXIV_ARR_NAME,
'game.release_date': FFXIV_ARR_RELEASE_DATE,
'game.name': 'Final Fantasy XIV: A Realm Reborn',
'me.last_played_time': time_acquired,
# Trophy Data
'trophy.name': trophy_name,

View File

@ -43,7 +43,7 @@ def iterate_watched_episodes_of_series(client, series_id: str):
@dataclasses.dataclass(frozen=True)
class JellyfinWatchHistory(Scraper):
class JellyfinWatchHistoryScraper(Scraper):
dataset_name = 'show_episodes_watched'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS

View File

@ -1,5 +1,4 @@
import dataclasses
import datetime
import logging
import re
from collections.abc import Iterator
@ -31,10 +30,8 @@ MAX_NUMBER_GAMES_TO_PARSE = 10000
@dataclasses.dataclass(frozen=True)
class PsnProfiles(Scraper):
"""Downloads all trophies for the given user."""
dataset_name = 'games_played'
class PsnProfilesScraper(Scraper):
dataset_name = 'games_played_playstation'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
@staticmethod
@ -127,16 +124,6 @@ class PsnProfiles(Scraper):
d['me.last_played_time'] = time_played
yield d
def _parse_game_release_date(self, soup: bs4.BeautifulSoup) -> datetime.date:
table_rows = soup.select('table.gameInfo tr')
for row in table_rows:
cells = row.select('td')
if cells[0].get_text() in {'Release', 'Releases'}:
text = cells[1].get_text()
dates = re.findall(r'\w+\s+\d+,\s+\d{4}', text)
return min(parse_util.parse_date(date) for date in dates)
assert False, 'Could not find release date'
def _scrape_game_trophies(
self,
psnprofiles_id: int,
@ -156,14 +143,8 @@ class PsnProfiles(Scraper):
# Parse data
soup = bs4.BeautifulSoup(response.content, 'lxml')
# Normalize before parsing trophies
soup = personal_data.html_util.normalize_soup_slightly(soup, classes=False)
# Parse release year
game_release_date = self._parse_game_release_date(soup)
assert game_release_date
# Remove redundant
for redundant in soup.select('.wide-ad'):
redundant.extract()
@ -193,13 +174,11 @@ class PsnProfiles(Scraper):
yield {
'game.name': game_name,
'game.release_date': game_release_date,
'me.last_played_time': gotten_at,
# Trophy Data
'trophy.name': trophy_name,
'trophy.desc': trophy_desc,
'trophy.icon': trophy_icon,
# Ids
'psnprofiles.game_id': psnprofiles_id,
}

View File

@ -21,20 +21,16 @@ FORMAT_DATE_HEADER = '%d/%m/%YYYY'
@dataclasses.dataclass(frozen=True)
class SteamAchievement(Scraper):
"""Downloads all trophies for the given user."""
dataset_name = 'games_played'
class SteamAchievementScraper(Scraper):
dataset_name = 'games_played_TODO'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
def scrape(self) -> Iterator[dict[str, Any]]:
username: str = secrets.STEAM_USERNAME
appids = list(self._determine_appids_from_recent_activity(username))
logger.info('Found %d Steam Apps', len(appids))
for appid in appids:
yield from self._scrape_app_achievements(username, appid)
username = secrets.STEAM_USERNAME
for appid in self.determine_appids_from_recent_activity(username):
yield from self.scrape_app(username, appid)
def _determine_appids_from_recent_activity(self, username: str) -> Iterator[int]:
def determine_appids_from_recent_activity(self, username: str) -> Iterator[int]:
url = URL_USER_RECENT_ACTIVITY.format(
username=username,
)
@ -51,9 +47,7 @@ class SteamAchievement(Scraper):
appid = int(href.split('/')[-1])
yield appid
def _scrape_app_achievements(
self, username: str, appid: int,
) -> Iterator[dict[str, Any]]:
def scrape_app(self, username: str, appid: int) -> Iterator[dict[str, Any]]:
url = URL_GAME_ACHIVEMENTS.format(
username=username,
appid=appid,
@ -61,6 +55,8 @@ class SteamAchievement(Scraper):
response = self.session.get(url)
response.raise_for_status()
NOW = parse_util.parse_response_datetime(response)
# Parse data
soup = bs4.BeautifulSoup(response.content, 'lxml')
@ -77,7 +73,7 @@ class SteamAchievement(Scraper):
for entry in soup.select('.achieveRow'):
trophy_name: str = entry.select_one('h3').get_text()
trophy_desc: str = entry.select_one('h5').get_text()
trophy_icon: str = entry.select_one('img')['src']
trophy_icon: str = entry.select_one('img').src
time_acquired_html: str = entry.select_one('.achieveUnlockTime')
if time_acquired_html is None:
@ -89,14 +85,11 @@ class SteamAchievement(Scraper):
yield {
'game.name': game_name,
#'game.release_date': None,
'me.last_played_time': time_acquired,
# Trophy Data
'trophy.name': trophy_name,
'trophy.desc': trophy_desc,
'trophy.icon': trophy_icon,
# Ids
'steam.appid': appid,
}
del entry, time_acquired

View File

@ -41,7 +41,7 @@ def load_credentials() -> CredentialsType:
@dataclasses.dataclass(frozen=True)
class WithingsActivity(Scraper):
class WithingsActivityScraper(Scraper):
dataset_name = 'withings_activity'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS

View File

@ -19,6 +19,8 @@ DATETIME_UNITS = {
FORMAT_DATE_HEADER = '%a, %d %b %Y %H:%M:%S GMT'
FORMAT_DAY_MONTH_YEAR = '%d %B %Y'
def parse_duration(text: str) -> datetime.timedelta:
(num, unit) = text.split(' ')
@ -67,11 +69,7 @@ def parse_time(text: str) -> datetime.datetime:
def parse_date(text: str) -> datetime.date:
text = text.strip()
if dt := try_parse(text, '%d %B %Y'):
return dt.date()
if dt := try_parse(text, '%b %d, %Y'):
return dt.date()
if dt := try_parse(text, '%B %d, %Y'):
return dt.date()
assert False, text
return datetime.datetime.strptime(
text.strip(),
FORMAT_DAY_MONTH_YEAR,
).date()