213 lines
7.2 KiB
Python
213 lines
7.2 KiB
Python
import dataclasses
|
|
import datetime
|
|
import logging
|
|
import re
|
|
from collections.abc import Iterator
|
|
|
|
import bs4
|
|
import requests_util
|
|
|
|
import personal_data.html_util
|
|
from personal_data import secrets
|
|
from personal_data.data import DeduplicateMode, Scraper
|
|
|
|
from .. import parse_util
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
URL_API_ROOT = 'https://psnprofiles.com/'
|
|
URL_PROFILE = URL_API_ROOT + '{psn_id}'
|
|
URL_USER_GAME_TROPHIES = URL_API_ROOT + 'trophies/{game_id}/{psn_id}'
|
|
URL_GAMES_OVERVIEW = URL_API_ROOT + '{psn_id}'
|
|
|
|
|
|
def game_psnprofiles_id_from_url(relative_url: str) -> int:
|
|
m = re.match(r'/(?:trophy|trophies)/(\d+)\-(?:[\w-]+)(/[\w-]*)?', relative_url)
|
|
result = m.group(1)
|
|
return int(result)
|
|
|
|
|
|
MAX_NUMBER_GAMES_TO_PARSE = 10000
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class PsnProfiles(Scraper):
|
|
"""Downloads all trophies for the given user.
|
|
|
|
Individual game pages are cached for a period between 1 to 30 days,
|
|
depending upon how recently you played them.
|
|
"""
|
|
|
|
dataset_name = 'games_played'
|
|
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
|
|
|
|
@staticmethod
|
|
def requires_cfscrape() -> bool:
|
|
return True
|
|
|
|
def scrape(self):
|
|
self._setup_cache()
|
|
games_rows = list(self._scrape_games_overview())
|
|
games_ids = {row['psnprofiles.game_id']: row['game.name'] for row in games_rows}
|
|
|
|
logger.info('Found %d games from overview', len(games_rows))
|
|
|
|
for idx, (game_id, game_name) in enumerate(reversed(games_ids.items())):
|
|
cache_duration = datetime.timedelta(days=min(idx+1, 30))
|
|
yield from self._scrape_game_trophies(game_id, game_name, cache_duration)
|
|
del game_id
|
|
if idx >= MAX_NUMBER_GAMES_TO_PARSE:
|
|
break
|
|
logger.info('Found all trophies for playstation games')
|
|
|
|
def _setup_cache(self):
|
|
requests_util.setup_limiter(
|
|
self.session,
|
|
URL_API_ROOT,
|
|
per_minute=5,
|
|
)
|
|
|
|
def _scrape_games_overview(self) -> Iterator[dict]:
|
|
for page_num in range(1, 1000):
|
|
logger.info('Getting Overview (page %d)', page_num)
|
|
url = URL_GAMES_OVERVIEW.format(psn_id=secrets.PLAYSTATION_PSN_ID)
|
|
response = self.session.get(url, params={'page': page_num})
|
|
if 'page' not in response.url:
|
|
msg = "Configuration error? psnprofiles.com made an redirection. This is possibly because your profile name wasn't exactly as expected. Please check it"
|
|
raise RuntimeError(msg)
|
|
response.raise_for_status()
|
|
soup = bs4.BeautifulSoup(response.text, 'lxml')
|
|
soup = personal_data.html_util.normalize_soup_slightly(soup, classes=False)
|
|
games_on_page = list(self._iterate_games_from_games_table(soup))
|
|
yield from games_on_page
|
|
if len(games_on_page) == 0:
|
|
return
|
|
|
|
def _iterate_games_from_games_table(self, soup) -> Iterator[dict]:
|
|
# Games table
|
|
table_rows = soup.find(id='gamesTable').find_all('tr')
|
|
assert len(table_rows) > 0, url
|
|
|
|
if title := table_rows[0].h2:
|
|
if title.get_text().strip() == 'No games found':
|
|
return
|
|
|
|
for row in table_rows:
|
|
cells = row.find_all('td')
|
|
|
|
# Check for pagination
|
|
if re.match(
|
|
r'show \d+ more games',
|
|
cells[0].get_text().strip(),
|
|
re.IGNORECASE,
|
|
):
|
|
break
|
|
|
|
game_name = cells[1].find(class_='title').get_text()
|
|
psnprofiles_id = game_psnprofiles_id_from_url(cells[0].find('a')['href'])
|
|
game_icon = cells[0].find('img')['src']
|
|
|
|
game_name = row.select_one('.title').get_text()
|
|
game_platform = row.select_one('.platform').get_text()
|
|
|
|
small_infos = cells[1].find_all('div')
|
|
if len(small_infos) > 2:
|
|
time_played_div = small_infos[2]
|
|
time_played_div.sup.extract()
|
|
time_played = parse_util.parse_date(
|
|
time_played_div.get_text(),
|
|
)
|
|
else:
|
|
time_played = None
|
|
|
|
d = {
|
|
# Important fields
|
|
'game.name': game_name,
|
|
# Secondary fields
|
|
'game.platform': game_platform,
|
|
'game.icon': game_icon,
|
|
'psnprofiles.game_id': psnprofiles_id,
|
|
}
|
|
if time_played:
|
|
d['me.last_played_time'] = time_played
|
|
yield d
|
|
|
|
def _parse_game_release_date(self, soup: bs4.BeautifulSoup) -> datetime.date:
|
|
table_rows = soup.select('table.gameInfo tr')
|
|
for row in table_rows:
|
|
cells = row.select('td')
|
|
if cells[0].get_text() in {'Release', 'Releases'}:
|
|
text = cells[1].get_text()
|
|
dates = re.findall(r'\w+\s+\d+,\s+\d{4}', text)
|
|
return min(parse_util.parse_date(date) for date in dates)
|
|
assert False, 'Could not find release date'
|
|
|
|
def _scrape_game_trophies(
|
|
self,
|
|
psnprofiles_id: int,
|
|
game_name: str,
|
|
cache_duration: datetime.timedelta,
|
|
) -> Iterator[dict]:
|
|
assert isinstance(psnprofiles_id, int), psnprofiles_id
|
|
assert isinstance(game_name, str), game_name
|
|
|
|
logger.info('Getting Game Trophies %s', psnprofiles_id)
|
|
|
|
url = URL_USER_GAME_TROPHIES.format(
|
|
psn_id=secrets.PLAYSTATION_PSN_ID,
|
|
game_id=psnprofiles_id,
|
|
)
|
|
response = self.session.get(url, expire_after=cache_duration)
|
|
response.raise_for_status()
|
|
|
|
# Parse data
|
|
soup = bs4.BeautifulSoup(response.content, 'lxml')
|
|
|
|
# Normalize before parsing trophies
|
|
soup = personal_data.html_util.normalize_soup_slightly(soup, classes=False)
|
|
|
|
# Parse release year
|
|
game_release_date = self._parse_game_release_date(soup)
|
|
assert game_release_date
|
|
|
|
# Remove redundant
|
|
for redundant in soup.select('.wide-ad'):
|
|
redundant.extract()
|
|
for redundant in soup.select('div.col-xs-4'):
|
|
redundant.extract()
|
|
|
|
# Recent trophies.
|
|
soup_tropies = soup.select(
|
|
'#content.page > .row > div.col-xs div.box table.zebra tr.completed',
|
|
)
|
|
for row in soup_tropies:
|
|
cells = row.find_all('td')
|
|
|
|
trophy_name_a = cells[1].a
|
|
if trophy_name_a is None:
|
|
continue
|
|
trophy_name = trophy_name_a.get_text().strip()
|
|
trophy_name_a.extract()
|
|
trophy_desc = cells[1].get_text().strip()
|
|
|
|
trophy_icon = cells[0].img['src']
|
|
|
|
if 'Missing\nTimestamp' in cells[2].get_text().strip():
|
|
continue
|
|
cells[2].span.span.nobr.sup.extract()
|
|
gotten_at = parse_util.parse_time(cells[2].get_text())
|
|
|
|
yield {
|
|
'game.name': game_name,
|
|
'game.release_date': game_release_date,
|
|
'me.last_played_time': gotten_at,
|
|
# Trophy Data
|
|
'trophy.name': trophy_name,
|
|
'trophy.desc': trophy_desc,
|
|
'trophy.icon': trophy_icon,
|
|
# Ids
|
|
'psnprofiles.game_id': psnprofiles_id,
|
|
}
|
|
|
|
del row, cells
|