PSN Profiles
This commit is contained in:
parent
1ad2d31f15
commit
87189f1bd8
11
README.md
Normal file
11
README.md
Normal file
|
@ -0,0 +1,11 @@
|
|||
|
||||
# Personal Data Fetcher Systems
|
||||
|
||||
This is a collection of small fetchers for personal data spread around the internet.
|
||||
|
||||
## Ideas for more fetchers
|
||||
|
||||
- [ ] Save data for Theatherythm: Most played songs, and liked songs.
|
||||
- [ ] YouTube (Music): Liked videos with title and URL.
|
||||
- [ ] Playstation: Achivement dates, hours played and last played dates
|
||||
|
|
@ -7,16 +7,19 @@ import io
|
|||
import browsercookie
|
||||
from frozendict import frozendict
|
||||
import logging
|
||||
import cfscrape
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
import personal_data.fetchers.playstation
|
||||
import personal_data.fetchers.crunchyroll
|
||||
import personal_data.fetchers.psnprofiles
|
||||
|
||||
def determine_scrapers():
|
||||
scrapers = []
|
||||
#scrapers += personal_data.fetchers.playstation.SCRAPERS
|
||||
scrapers += personal_data.fetchers.crunchyroll.SCRAPERS
|
||||
#scrapers += personal_data.fetchers.crunchyroll.SCRAPERS
|
||||
scrapers += personal_data.fetchers.psnprofiles.SCRAPERS
|
||||
return scrapers
|
||||
|
||||
def extend_csv_file(filename, new_dicts, deduplicate = False):
|
||||
|
@ -31,10 +34,15 @@ def extend_csv_file(filename, new_dicts , deduplicate = False):
|
|||
logger.info('Creating file: %s', filename)
|
||||
pass
|
||||
|
||||
original_num_dicts = len(dicts)
|
||||
dicts += [frozendict(d) for d in new_dicts]
|
||||
del new_dicts
|
||||
|
||||
fieldnames = list(dicts[0].keys())
|
||||
fieldnames = []
|
||||
for d in dicts:
|
||||
for k in d.keys():
|
||||
if k not in fieldnames:
|
||||
fieldnames.append(k)
|
||||
|
||||
if deduplicate:
|
||||
dicts = sorted(set(dicts), key = lambda d: d[fieldnames[0]])
|
||||
|
@ -50,10 +58,11 @@ def extend_csv_file(filename, new_dicts , deduplicate = False):
|
|||
with open(filename, 'w') as csvfile:
|
||||
csvfile.write(output_csv)
|
||||
del csvfile
|
||||
logger.warning('Extended CSV "%s" from %d to %d lines', filename, original_num_dicts, len(dicts))
|
||||
|
||||
STANDARD_HEADERS = {
|
||||
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0',
|
||||
"Accept": "application/json, text/plain, */*",
|
||||
#"Accept": "application/json, text/plain, */*",
|
||||
'Accept-Language': 'en-US,en;q=0.5',
|
||||
'Accept-Encoding': 'gzip, deflate, br',
|
||||
}
|
||||
|
@ -62,15 +71,20 @@ def main():
|
|||
cookiejar = browsercookie.firefox()
|
||||
logger.warning('Got cookiejar from firefox: %s cookies', len(cookiejar))
|
||||
|
||||
session = requests_cache.CachedSession('web_cache', cookies = cookiejar)
|
||||
#session = requests_cache.CachedSession('web_cache', cookies = cookiejar)
|
||||
session = cfscrape.create_scraper()
|
||||
for cookie in cookiejar:
|
||||
session.cookies.set_cookie(cookie)
|
||||
|
||||
for scraper in determine_scrapers():
|
||||
logger.warning('Running scraper: %s', scraper.dataset_name)
|
||||
result_rows = list(scraper.scraper(session))
|
||||
result_rows = list()
|
||||
for result in scraper.scraper(session):
|
||||
result_rows.append(result)
|
||||
del result
|
||||
extend_csv_file('output/'+scraper.dataset_name, result_rows,
|
||||
deduplicate = scraper.deduplicate)
|
||||
logger.warning('Scraper done: %s', scraper.dataset_name)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
113
personal_data/fetchers/psnprofiles.py
Normal file
113
personal_data/fetchers/psnprofiles.py
Normal file
|
@ -0,0 +1,113 @@
|
|||
import secrets
|
||||
import functools
|
||||
import re
|
||||
import logging
|
||||
import bs4
|
||||
import datetime
|
||||
|
||||
from personal_data.data import Scraper
|
||||
import personal_data.html_util
|
||||
import personal_data.parse_util
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
URL_PROFILE = 'https://psnprofiles.com/{psn_id}'
|
||||
|
||||
FORMAT_DATE_HEADER = '%a, %d %b %Y %H:%M:%S GMT'
|
||||
FORMAT_DAY_MONTH_YEAR = '%d %B %Y'
|
||||
|
||||
def game_psnprofiles_id_from_url(relative_url: str) -> int:
|
||||
m = re.match(r'/(?:trophy|trophies)/(\d+)\-(?:[\w-]+)(/[\w-]*)?', relative_url)
|
||||
result = m.group(1)
|
||||
return int(result)
|
||||
|
||||
assert game_psnprofiles_id_from_url('/trophies/21045-theatrhythm-final-bar-line')
|
||||
assert game_psnprofiles_id_from_url('/trophies/21045-theatrhythm-final-bar-line/')
|
||||
assert game_psnprofiles_id_from_url('/trophies/21045-theatrhythm-final-bar-line/Jmaanes')
|
||||
assert game_psnprofiles_id_from_url('/trophy/21045-theatrhythm-final-bar-line/19-seasoned-hunter')
|
||||
|
||||
def scrape_personal_page(session):
|
||||
# Request to get watch history
|
||||
logger.info('Getting Watchlist')
|
||||
url = URL_PROFILE.format(psn_id = secrets.PLAYSTATION_PSN_ID)
|
||||
response = session.get(url)
|
||||
response.raise_for_status()
|
||||
|
||||
NOW = datetime.datetime.strptime(response.headers['Date'], FORMAT_DATE_HEADER)
|
||||
|
||||
# Parse data
|
||||
soup = bs4.BeautifulSoup(response.content, 'lxml')
|
||||
soup = personal_data.html_util.normalize_soup_slightly(soup, classes = False)
|
||||
|
||||
# Recent trophies.
|
||||
soup_recent_tropies = soup.select('ul#recent-trophies > li')
|
||||
assert len(soup_recent_tropies) > 0, url
|
||||
for row in soup_recent_tropies:
|
||||
cells = row.select_one('.info .box td').find_all('div')
|
||||
|
||||
trophy_name = cells[0].get_text().strip()
|
||||
trophy_desc = cells[1].get_text().strip()
|
||||
game_name = cells[2].a.extract().get_text().strip()
|
||||
|
||||
psnprofiles_id = game_psnprofiles_id_from_url(cells[0].find('a')['href'])
|
||||
trophy_icon = row.find(class_='icon').find('img')['src']
|
||||
|
||||
gotten_at = cells[2].get_text().strip().removesuffix(' in').removesuffix(' ago')
|
||||
gotten_at = personal_data.parse_util.parse_duration(gotten_at)
|
||||
time_acquired = NOW - gotten_at
|
||||
|
||||
yield {
|
||||
'game.name' : game_name,
|
||||
'me.last_played_time': time_acquired,
|
||||
|
||||
# Trophy Data
|
||||
'trophy.name': trophy_name,
|
||||
'trophy.desc': trophy_desc,
|
||||
'trophy.icon': trophy_icon,
|
||||
'psnprofiles.game_id': psnprofiles_id,
|
||||
}
|
||||
|
||||
del row, cells, time_acquired
|
||||
|
||||
# Games table
|
||||
table_rows = soup.find(id = 'gamesTable').find_all('tr')
|
||||
assert len(table_rows) > 0, url
|
||||
|
||||
for row in table_rows:
|
||||
cells = row.find_all('td')
|
||||
|
||||
# Check for pagination
|
||||
if re.match(r'show \d+ more games', cells[0].get_text().strip(), re.IGNORECASE):
|
||||
break
|
||||
|
||||
game_name = cells[1].find(class_ = 'title').get_text()
|
||||
psnprofiles_id = game_psnprofiles_id_from_url(cells[0].find('a')['href'])
|
||||
game_icon = cells[0].find('img')['src']
|
||||
|
||||
game_name = row.select_one('.title').get_text()
|
||||
game_platform = row.select_one('.platform').get_text()
|
||||
|
||||
small_infos = cells[1].find_all('div')
|
||||
if len(small_infos) > 2:
|
||||
time_played_div = small_infos[2]
|
||||
time_played_div.sup.extract()
|
||||
time_played = datetime.datetime.strptime(time_played_div.get_text().strip(), FORMAT_DAY_MONTH_YEAR).date()
|
||||
else:
|
||||
time_played = None
|
||||
|
||||
d = {
|
||||
# Important fields
|
||||
'game.name': game_name,
|
||||
|
||||
# Secondary fields
|
||||
'game.platform': game_platform,
|
||||
'game.icon': game_icon,
|
||||
'psnprofiles.game_id': psnprofiles_id,
|
||||
}
|
||||
if time_played:
|
||||
d['me.last_played_time'] = time_played
|
||||
yield d
|
||||
|
||||
SCRAPERS = [
|
||||
Scraper(scrape_personal_page, 'games_played_playstation', deduplicate = True)
|
||||
]
|
73
personal_data/html_util.py
Normal file
73
personal_data/html_util.py
Normal file
|
@ -0,0 +1,73 @@
|
|||
import re
|
||||
import bs4
|
||||
|
||||
HTML_TAGS_MOSTLY_CONTENTLESS: set[str] = {'style', 'svg', 'link', 'br', 'math',
|
||||
'canvas'}
|
||||
|
||||
HTML_TAGS_WITH_LITTLE_CONTENT: set[str] = { 'head', 'script', 'meta' } | HTML_TAGS_MOSTLY_CONTENTLESS
|
||||
|
||||
def normalize_text(text: str) -> str:
|
||||
text = text.replace('\t', ' ')
|
||||
text = text.replace('\r', '')
|
||||
text = re.sub(r'\s*\n\s*\n\s*', '\n\n', text)
|
||||
text = re.sub(r' +', ' ', text)
|
||||
text = re.sub(r'^\s+', '', text)
|
||||
text = re.sub(r'\s+$', '', text)
|
||||
return text.encode('utf-8')
|
||||
|
||||
def normalize_soup_bs4(soup: bs4.BeautifulSoup) -> bytes:
|
||||
for comment in soup(text=lambda text: isinstance(text, bs4.Comment)):
|
||||
comment.extract()
|
||||
del comment
|
||||
for element_name in HTML_TAGS_WITH_LITTLE_CONTENT:
|
||||
for script_elements in soup(element_name):
|
||||
script_elements.decompose()
|
||||
del element_name
|
||||
soup.smooth()
|
||||
return soup
|
||||
|
||||
def normalize_soup_lxml(soup) -> bytes:
|
||||
for element_name in HTML_TAGS_WITH_LITTLE_CONTENT:
|
||||
for script_elements in soup.cssselect(element_name):
|
||||
script_elements.drop_tree()
|
||||
del script_elements
|
||||
del element_name
|
||||
return soup
|
||||
|
||||
def normalize_soup(soup) -> bytes:
|
||||
text = None
|
||||
if isinstance(soup, bs4.BeautifulSoup):
|
||||
text = normalize_soup_bs4(soup).get_text()
|
||||
else:
|
||||
text = normalize_soup_lxml(soup).text_content()
|
||||
return normalize_text(text)
|
||||
|
||||
def normalize_soup_slightly(soup, classes = True, scripts = True, comments = True):
|
||||
# Little if any content
|
||||
for tag in HTML_TAGS_MOSTLY_CONTENTLESS:
|
||||
for e in soup.select(tag):
|
||||
e.decompose()
|
||||
|
||||
if classes:
|
||||
for e in soup.find_all(class_=True):
|
||||
del e['class']
|
||||
for e in soup.find_all('script', src=True):
|
||||
e.decompose()
|
||||
for e in soup.find_all(style=True):
|
||||
del e['style']
|
||||
for e in soup.select('a'):
|
||||
del e['height'], e['target'], e['rel'], e['onclick']
|
||||
|
||||
for e in soup.select('a[href=""]'):
|
||||
del e['href']
|
||||
|
||||
if scripts:
|
||||
for e in soup.find_all('script'):
|
||||
e.decompose()
|
||||
|
||||
if comments:
|
||||
for c in soup.find_all(string=lambda text: isinstance(text, bs4.Comment)):
|
||||
c.extract()
|
||||
|
||||
soup.smooth()
|
||||
return soup
|
25
personal_data/parse_util.py
Normal file
25
personal_data/parse_util.py
Normal file
|
@ -0,0 +1,25 @@
|
|||
|
||||
import datetime
|
||||
|
||||
DATETIME_UNITS = {
|
||||
'second': datetime.timedelta(seconds = 1),
|
||||
'seconds': datetime.timedelta(seconds = 1),
|
||||
'minute': datetime.timedelta(minutes = 1),
|
||||
'minutes': datetime.timedelta(minutes = 1),
|
||||
'hour': datetime.timedelta(hours = 1),
|
||||
'hours': datetime.timedelta(hours = 1),
|
||||
'day': datetime.timedelta(days = 1),
|
||||
'days': datetime.timedelta(days = 1),
|
||||
'week': datetime.timedelta(days = 7),
|
||||
'weeks': datetime.timedelta(days = 7),
|
||||
'month': datetime.timedelta(days = 30),
|
||||
'months': datetime.timedelta(days = 30),
|
||||
'year': datetime.timedelta(days = 365),
|
||||
'years': datetime.timedelta(days = 365),
|
||||
}
|
||||
|
||||
def parse_duration(text: str) -> datetime.timedelta:
|
||||
(num, unit) = text.split(' ')
|
||||
num = int(num)
|
||||
unit = DATETIME_UNITS[unit]
|
||||
return unit * num
|
Loading…
Reference in New Issue
Block a user