1
0

Ruff format
Some checks failed
Python Package / Package (push) Failing after 17s

This commit is contained in:
Jon Michael Aanes 2024-04-01 00:55:55 +02:00
parent 3851c94929
commit 6d3a8fd56e
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
9 changed files with 252 additions and 159 deletions

View File

@ -1,23 +1,25 @@
import requests
import requests_cache
import csv import csv
import datetime import datetime
import io import io
import browsercookie
from frozendict import frozendict
import logging import logging
import browsercookie
import cfscrape import cfscrape
import requests
import requests_cache
from frozendict import frozendict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
import personal_data.data
import personal_data.fetchers.crunchyroll
import personal_data.fetchers.ffxiv_lodestone import personal_data.fetchers.ffxiv_lodestone
import personal_data.fetchers.playstation import personal_data.fetchers.playstation
import personal_data.fetchers.crunchyroll
import personal_data.fetchers.psnprofiles import personal_data.fetchers.psnprofiles
import personal_data.data
CSV_DIALECT = 'one_true_dialect' CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator = '\n', skipinitialspace = True) csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
def try_value(fn, s: str) -> any: def try_value(fn, s: str) -> any:
try: try:
@ -25,15 +27,16 @@ def try_value(fn, s: str) -> any:
except ValueError: except ValueError:
return None return None
def to_value(s: str) -> any: def to_value(s: str) -> any:
s = s.strip() s = s.strip()
if len(s) == 0: if len(s) == 0:
return None return None
if v := try_value(int, s): if v := try_value(int, s):
return v return v
if v := try_value(datetime.date.fromisoformat,s): if v := try_value(datetime.date.fromisoformat, s):
return v return v
if v := try_value(datetime.datetime.fromisoformat,s): if v := try_value(datetime.datetime.fromisoformat, s):
return v return v
if s.lower() == 'false': if s.lower() == 'false':
return False return False
@ -43,11 +46,16 @@ def to_value(s: str) -> any:
return None return None
return s return s
def extend_csv_file(filename: str, new_dicts: dict, deduplicate_mode: personal_data.data.DeduplicateMode):
def extend_csv_file(
filename: str,
new_dicts: dict,
deduplicate_mode: personal_data.data.DeduplicateMode,
):
dicts = [] dicts = []
try: try:
with open(filename, 'r') as csvfile: with open(filename, 'r') as csvfile:
reader = csv.DictReader(csvfile, dialect = CSV_DIALECT) reader = csv.DictReader(csvfile, dialect=CSV_DIALECT)
for row in reader: for row in reader:
for k in list(row.keys()): for k in list(row.keys()):
row[k] = to_value(row[k]) row[k] = to_value(row[k])
@ -73,10 +81,14 @@ def extend_csv_file(filename: str, new_dicts: dict, deduplicate_mode: personal_d
if deduplicate_mode != personal_data.data.DeduplicateMode.NONE: if deduplicate_mode != personal_data.data.DeduplicateMode.NONE:
dicts = set(dicts) dicts = set(dicts)
dicts = sorted(dicts, key = lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames)) dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames))
csvfile_in_memory = io.StringIO() csvfile_in_memory = io.StringIO()
writer = csv.DictWriter(csvfile_in_memory, fieldnames=fieldnames, dialect = CSV_DIALECT) writer = csv.DictWriter(
csvfile_in_memory,
fieldnames=fieldnames,
dialect=CSV_DIALECT,
)
writer.writeheader() writer.writeheader()
for d in dicts: for d in dicts:
writer.writerow(d) writer.writerow(d)
@ -86,25 +98,33 @@ def extend_csv_file(filename: str, new_dicts: dict, deduplicate_mode: personal_d
with open(filename, 'w') as csvfile: with open(filename, 'w') as csvfile:
csvfile.write(output_csv) csvfile.write(output_csv)
del csvfile del csvfile
logger.warning('Extended CSV "%s" from %d to %d lines', filename, original_num_dicts, len(dicts)) logger.warning(
'Extended CSV "%s" from %d to %d lines',
filename,
original_num_dicts,
len(dicts),
)
STANDARD_HEADERS = { STANDARD_HEADERS = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0', 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0',
#"Accept": "application/json, text/plain, */*", # "Accept": "application/json, text/plain, */*",
'Accept-Language': 'en-US,en;q=0.5', 'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br', 'Accept-Encoding': 'gzip, deflate, br',
} }
def get_session(with_cfscrape: bool, cookiejar) -> requests.Session: def get_session(with_cfscrape: bool, cookiejar) -> requests.Session:
assert isinstance(with_cfscrape, bool) assert isinstance(with_cfscrape, bool)
if with_cfscrape: if with_cfscrape:
session = cfscrape.create_scraper() session = cfscrape.create_scraper()
else: else:
session = requests_cache.CachedSession('web_cache', cookies = cookiejar) session = requests_cache.CachedSession('web_cache', cookies=cookiejar)
for cookie in cookiejar: for cookie in cookiejar:
session.cookies.set_cookie(cookie) session.cookies.set_cookie(cookie)
return session return session
def main(): def main():
cookiejar = browsercookie.firefox() cookiejar = browsercookie.firefox()
logger.warning('Got cookiejar from firefox: %s cookies', len(cookiejar)) logger.warning('Got cookiejar from firefox: %s cookies', len(cookiejar))
@ -112,17 +132,24 @@ def main():
for scraper_cls in personal_data.data.Scraper.__subclasses__(): for scraper_cls in personal_data.data.Scraper.__subclasses__():
session = get_session(scraper_cls.requires_cfscrape(), cookiejar) session = get_session(scraper_cls.requires_cfscrape(), cookiejar)
scraper = scraper_cls(session) scraper = scraper_cls(session)
logger.warning('Running %s, appending to "%s"', scraper_cls.__name__, scraper.dataset_name) logger.warning(
'Running %s, appending to "%s"',
scraper_cls.__name__,
scraper.dataset_name,
)
del scraper_cls del scraper_cls
result_rows = list() result_rows = list()
for result in scraper.scrape(): for result in scraper.scrape():
result_rows.append(result) result_rows.append(result)
del result del result
extend_csv_file('output/'+scraper.dataset_name, result_rows, extend_csv_file(
deduplicate_mode = scraper.deduplicate_mode) 'output/' + scraper.dataset_name,
result_rows,
deduplicate_mode=scraper.deduplicate_mode,
)
logger.warning('Scraper done: %s', scraper.dataset_name) logger.warning('Scraper done: %s', scraper.dataset_name)
del scraper, session del scraper, session
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -1,14 +1,17 @@
import dataclasses
import requests
from enum import Enum
import abc import abc
import dataclasses
from enum import Enum
import requests
class DeduplicateMode(Enum): class DeduplicateMode(Enum):
NONE = 0 NONE = 0
BY_FIRST_COLUMN = 1 BY_FIRST_COLUMN = 1
BY_ALL_COLUMNS = 2 BY_ALL_COLUMNS = 2
@dataclasses.dataclass(frozen = True)
@dataclasses.dataclass(frozen=True)
class Scraper(abc.ABC): class Scraper(abc.ABC):
session: requests.Session session: requests.Session
@ -31,4 +34,3 @@ class Scraper(abc.ABC):
@abc.abstractmethod @abc.abstractmethod
def scrape(self): def scrape(self):
pass pass

View File

@ -1,46 +1,59 @@
import secrets
import functools
import logging
import dataclasses import dataclasses
import logging
import secrets
from personal_data.data import Scraper, DeduplicateMode from personal_data.data import DeduplicateMode, Scraper
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
API_ROOT = 'https://www.crunchyroll.com' API_ROOT = 'https://www.crunchyroll.com'
API_URL_TOKEN = API_ROOT + '/auth/v1/token' API_URL_TOKEN = API_ROOT + '/auth/v1/token'
API_URL_ME = API_ROOT + '/accounts/v1/me' API_URL_ME = API_ROOT + '/accounts/v1/me'
API_URL_WATCH_LIST = API_ROOT + '/content/v2/{account_uuid}/watch-history?page_size=100&locale=en-US' API_URL_WATCH_LIST = (
API_ROOT + '/content/v2/{account_uuid}/watch-history?page_size=100&locale=en-US'
)
@dataclasses.dataclass(frozen = True)
@dataclasses.dataclass(frozen=True)
class CrunchyrollScraper(Scraper): class CrunchyrollScraper(Scraper):
dataset_name = 'episodes_watched_crunchyroll' dataset_name = 'episodes_watched_crunchyroll'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
def scrape(self): def scrape(self):
headers = { headers = {
'Referer': 'https://www.crunchyroll.com/history', 'Referer': 'https://www.crunchyroll.com/history',
'Authorization': secrets.CRUNCHYROLL_AUTH, # TODO: Determine automatically 'Authorization': secrets.CRUNCHYROLL_AUTH, # TODO: Determine automatically
} }
# Request to get account UUID # Request to get account UUID
logger.info('Getting Access Token') logger.info('Getting Access Token')
response = self.session.post(API_URL_TOKEN, headers = headers, cookies = self.session.cookies, data = { response = self.session.post(
"device_id": secrets.CRUNCHYROLL_DEVICE_ID, # TODO: Determine automatically API_URL_TOKEN,
"device_type": "Firefox on Linux", headers=headers,
"grant_type": "etp_rt_cookie" cookies=self.session.cookies,
}) data={
'device_id': secrets.CRUNCHYROLL_DEVICE_ID, # TODO: Determine automatically
'device_type': 'Firefox on Linux',
'grant_type': 'etp_rt_cookie',
},
)
response.raise_for_status() response.raise_for_status()
data_me = response.json() data_me = response.json()
headers['Authorization'] = '{} {}'.format(data_me['token_type'], data_me['access_token']) headers['Authorization'] = '{} {}'.format(
data_me['token_type'],
data_me['access_token'],
)
account_uuid = data_me['account_id'] account_uuid = data_me['account_id']
logger.info(' Account UUID: %s', account_uuid) logger.info(' Account UUID: %s', account_uuid)
# Request to get watch history # Request to get watch history
logger.info('Getting Watchlist') logger.info('Getting Watchlist')
response = self.session.get(API_URL_WATCH_LIST.format(account_uuid = account_uuid), headers = headers) response = self.session.get(
API_URL_WATCH_LIST.format(account_uuid=account_uuid),
headers=headers,
)
response.raise_for_status() response.raise_for_status()
# Parse data # Parse data
@ -49,24 +62,32 @@ class CrunchyrollScraper(Scraper):
for episode_data in episodes_data: for episode_data in episodes_data:
yield { yield {
# Sorting fields # Sorting fields
'datetime_played': episode_data['date_played'], 'datetime_played': episode_data['date_played'],
# Important fields
# Important fields 'series.title': episode_data['panel']['episode_metadata'][
'series.title': episode_data['panel']['episode_metadata']['series_title'], 'series_title'
'season.number': episode_data['panel']['episode_metadata']['season_number'], ],
'episode.number': episode_data['panel']['episode_metadata']['episode'], 'season.number': episode_data['panel']['episode_metadata'][
'episode.name': episode_data['panel']['title'], 'season_number'
],
# Secondary fields 'episode.number': episode_data['panel']['episode_metadata']['episode'],
'episode.language': episode_data['panel']['episode_metadata']['audio_locale'], 'episode.name': episode_data['panel']['title'],
'episode.duration_ms': episode_data['panel']['episode_metadata']['duration_ms'], # Secondary fields
'episode.maturity_ratings': ' '.join(episode_data['panel']['episode_metadata']['maturity_ratings']), 'episode.language': episode_data['panel']['episode_metadata'][
'season.title': episode_data['panel']['episode_metadata']['season_title'], 'audio_locale'
'fully_watched': episode_data['fully_watched'], ],
'episode.duration_ms': episode_data['panel']['episode_metadata'][
# Identifiers 'duration_ms'
'episode.crunchyroll_id': episode_data['id'], ],
'series.crunchyroll_id': episode_data['parent_id'], 'episode.maturity_ratings': ' '.join(
episode_data['panel']['episode_metadata']['maturity_ratings'],
),
'season.title': episode_data['panel']['episode_metadata'][
'season_title'
],
'fully_watched': episode_data['fully_watched'],
# Identifiers
'episode.crunchyroll_id': episode_data['id'],
'series.crunchyroll_id': episode_data['parent_id'],
} }

View File

@ -1,31 +1,39 @@
import secrets
import functools
import dataclasses import dataclasses
import re
import logging
import bs4
import datetime import datetime
import logging
import re
import secrets
import bs4
from personal_data.data import Scraper, DeduplicateMode
import personal_data.html_util import personal_data.html_util
import personal_data.parse_util import personal_data.parse_util
from personal_data.data import DeduplicateMode, Scraper
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
URL_PROFILE_ACHIEVEMENTS = 'https://eu.finalfantasyxiv.com/lodestone/character/{character_id}/achievement/?page={page_idx}' URL_PROFILE_ACHIEVEMENTS = 'https://eu.finalfantasyxiv.com/lodestone/character/{character_id}/achievement/?page={page_idx}'
URL_PROFILE_MINIONS = 'https://eu.finalfantasyxiv.com/lodestone/character/{character_id}/minion/' URL_PROFILE_MINIONS = (
URL_PROFILE_MOUNTS = 'https://eu.finalfantasyxiv.com/lodestone/character/{character_id}/mount/' 'https://eu.finalfantasyxiv.com/lodestone/character/{character_id}/minion/'
)
URL_PROFILE_MOUNTS = (
'https://eu.finalfantasyxiv.com/lodestone/character/{character_id}/mount/'
)
FORMAT_DATE_HEADER = '%d/%m/%YYYY' FORMAT_DATE_HEADER = '%d/%m/%YYYY'
@dataclasses.dataclass(frozen = True)
@dataclasses.dataclass(frozen=True)
class LodestoneAchievementScraper(Scraper): class LodestoneAchievementScraper(Scraper):
dataset_name = 'games_played_playstation' dataset_name = 'games_played_playstation'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
def scrape(self): def scrape(self):
for page_idx in range(1, 13+1): # TODO: Automatically determine for page_idx in range(1, 13 + 1): # TODO: Automatically determine
url = URL_PROFILE_ACHIEVEMENTS.format(character_id = secrets.FFXIV_CHARACTER_ID, page_idx = page_idx) url = URL_PROFILE_ACHIEVEMENTS.format(
character_id=secrets.FFXIV_CHARACTER_ID,
page_idx=page_idx,
)
response = self.session.get(url) response = self.session.get(url)
response.raise_for_status() response.raise_for_status()
@ -33,24 +41,35 @@ class LodestoneAchievementScraper(Scraper):
# Parse data # Parse data
soup = bs4.BeautifulSoup(response.content, 'lxml') soup = bs4.BeautifulSoup(response.content, 'lxml')
soup = personal_data.html_util.normalize_soup_slightly(soup, classes = False, scripts = False) soup = personal_data.html_util.normalize_soup_slightly(
soup,
classes=False,
scripts=False,
)
#print(soup) # print(soup)
for entry in soup.select('.ldst__achievement ul li.entry'): for entry in soup.select('.ldst__achievement ul li.entry'):
time_acquired = str(entry.script.text).strip() time_acquired = str(entry.script.text).strip()
time_acquired = re.search(r'ldst_strftime\((\d+)\s*,', time_acquired).group(1) time_acquired = re.search(
r'ldst_strftime\((\d+)\s*,',
time_acquired,
).group(1)
time_acquired = int(time_acquired) time_acquired = int(time_acquired)
time_acquired = datetime.datetime.fromtimestamp(time_acquired) time_acquired = datetime.datetime.fromtimestamp(time_acquired)
trophy_desc = entry.select_one('.entry__activity__txt').get_text().strip() trophy_desc = (
trophy_name = re.match(r'^.*achievement "([^"]+)" earned!$', trophy_desc).group(1) entry.select_one('.entry__activity__txt').get_text().strip()
)
trophy_name = re.match(
r'^.*achievement "([^"]+)" earned!$',
trophy_desc,
).group(1)
trophy_icon = entry.select_one('.entry__achievement__frame img') trophy_icon = entry.select_one('.entry__achievement__frame img')
trophy_icon = trophy_icon.src trophy_icon = trophy_icon.src
yield { yield {
'game.name' : 'Final Fantasy XIV: A Realm Reborn', 'game.name': 'Final Fantasy XIV: A Realm Reborn',
'me.last_played_time': time_acquired, 'me.last_played_time': time_acquired,
# Trophy Data # Trophy Data
'trophy.name': trophy_name, 'trophy.name': trophy_name,
'trophy.desc': trophy_desc, 'trophy.desc': trophy_desc,

View File

@ -1,18 +1,16 @@
import secrets
import logging import logging
from personal_data.data import Scraper, DeduplicateMode
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
URL_RECENTLY_PLAYED_HTML = 'https://library.playstation.com/recently-played' URL_RECENTLY_PLAYED_HTML = 'https://library.playstation.com/recently-played'
URL_RECENTLY_PLAYED_API = "https://web.np.playstation.com/api/graphql/v1/op?operationName=getUserGameList&variables=%7B%22limit%22%3A50%2C%22categories%22%3A%22ps4_game%2Cps5_native_game%22%7D&extensions=%7B%22persistedQuery%22%3A%7B%22version%22%3A1%2C%22sha256Hash%22%3A%22e0136f81d7d1fb6be58238c574e9a46e1c0cc2f7f6977a08a5a46f224523a004%22%7D%7D" URL_RECENTLY_PLAYED_API = 'https://web.np.playstation.com/api/graphql/v1/op?operationName=getUserGameList&variables=%7B%22limit%22%3A50%2C%22categories%22%3A%22ps4_game%2Cps5_native_game%22%7D&extensions=%7B%22persistedQuery%22%3A%7B%22version%22%3A1%2C%22sha256Hash%22%3A%22e0136f81d7d1fb6be58238c574e9a46e1c0cc2f7f6977a08a5a46f224523a004%22%7D%7D'
def scrape_played_last(session): def scrape_played_last(session):
# Initial request to trigger cookie. # Initial request to trigger cookie.
logger.warning('Trying to trigger initial cookie usage') logger.warning('Trying to trigger initial cookie usage')
response = session.get(URL_RECENTLY_PLAYED_HTML, cookies = session.cookies) response = session.get(URL_RECENTLY_PLAYED_HTML, cookies=session.cookies)
response.raise_for_status() response.raise_for_status()
print('From herp') print('From herp')
@ -23,19 +21,19 @@ def scrape_played_last(session):
# Now trigger API call. # Now trigger API call.
logger.warning('Trying to fetch data from API') logger.warning('Trying to fetch data from API')
headers = { headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/119.0", 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/119.0',
"Accept": "application/json", 'Accept': 'application/json',
"Accept-Language": "en-US,en;q=0.5", 'Accept-Language': 'en-US,en;q=0.5',
"content-type": "application/json", 'content-type': 'application/json',
"X-PSN-App-Ver": "my-playstation/0.1.0-20230720235210-hotfix-1-g1e9f07ff-1e9f07ff247eafea4d9d9236f73863cb9cd5d3e8", 'X-PSN-App-Ver': 'my-playstation/0.1.0-20230720235210-hotfix-1-g1e9f07ff-1e9f07ff247eafea4d9d9236f73863cb9cd5d3e8',
"X-PSN-Correlation-Id": "bc7a39b1-a99c-478b-9494-d2fddf189875", 'X-PSN-Correlation-Id': 'bc7a39b1-a99c-478b-9494-d2fddf189875',
"apollographql-client-name": "my-playstation", 'apollographql-client-name': 'my-playstation',
"apollographql-client-version": "0.1.0-20230720235210-hotfix-1-g1e9f07ff", 'apollographql-client-version': '0.1.0-20230720235210-hotfix-1-g1e9f07ff',
"X-PSN-Request-Id": "8ad64653-d8b5-4941-b565-b5536c9853df", 'X-PSN-Request-Id': '8ad64653-d8b5-4941-b565-b5536c9853df',
'Referer': 'https://library.playstation.com/', 'Referer': 'https://library.playstation.com/',
'Origin': 'https://library.playstation.com', 'Origin': 'https://library.playstation.com',
} }
result = session.get(URL_RECENTLY_PLAYED_API, headers = headers) result = session.get(URL_RECENTLY_PLAYED_API, headers=headers)
result.raise_for_status() result.raise_for_status()
print(result.json()) print(result.json())
@ -43,24 +41,23 @@ def scrape_played_last(session):
print(games_data) print(games_data)
for game_data in games_data: for game_data in games_data:
yield { yield {
# Important fields # Important fields
'game.name': game_data['name'], 'game.name': game_data['name'],
'me.last_played_time': game_data['lastPlayedDateTime'], 'me.last_played_time': game_data['lastPlayedDateTime'],
'playstation.product_id': game_data['productId'], 'playstation.product_id': game_data['productId'],
# Secondary fields
# Secondary fields 'playstation.concept_id': game_data['conceptId'],
'playstation.concept_id': game_data['conceptId'], 'playstation.title_id': game_data['titleId'],
'playstation.title_id': game_data['titleId'], 'playstation.entitlement_id': game_data['entitlementId'],
'playstation.entitlement_id': game_data['entitlementId'], 'me.acquired_by_playstation_membership': game_data['membership'],
'me.acquired_by_playstation_membership': game_data['membership'], 'game.platform': game_data['platform'],
'game.platform': game_data['platform'], 'game.icon': game_data['image']['url'],
'game.icon': game_data['image']['url'],
} }
'''
"""
SCRAPERS = [ SCRAPERS = [
Scraper(scrape_played_last, 'games_played_playstation', Scraper(scrape_played_last, 'games_played_playstation',
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS) deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS)
] ]
''' """

View File

@ -1,14 +1,14 @@
import secrets
import functools
import dataclasses import dataclasses
import re
import logging
import bs4
import datetime import datetime
import logging
import re
import secrets
import bs4
from personal_data.data import Scraper, DeduplicateMode
import personal_data.html_util import personal_data.html_util
import personal_data.parse_util import personal_data.parse_util
from personal_data.data import DeduplicateMode, Scraper
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -16,17 +16,24 @@ URL_PROFILE = 'https://psnprofiles.com/{psn_id}'
FORMAT_DAY_MONTH_YEAR = '%d %B %Y' FORMAT_DAY_MONTH_YEAR = '%d %B %Y'
def game_psnprofiles_id_from_url(relative_url: str) -> int: def game_psnprofiles_id_from_url(relative_url: str) -> int:
m = re.match(r'/(?:trophy|trophies)/(\d+)\-(?:[\w-]+)(/[\w-]*)?', relative_url) m = re.match(r'/(?:trophy|trophies)/(\d+)\-(?:[\w-]+)(/[\w-]*)?', relative_url)
result = m.group(1) result = m.group(1)
return int(result) return int(result)
assert game_psnprofiles_id_from_url('/trophies/21045-theatrhythm-final-bar-line') assert game_psnprofiles_id_from_url('/trophies/21045-theatrhythm-final-bar-line')
assert game_psnprofiles_id_from_url('/trophies/21045-theatrhythm-final-bar-line/') assert game_psnprofiles_id_from_url('/trophies/21045-theatrhythm-final-bar-line/')
assert game_psnprofiles_id_from_url('/trophies/21045-theatrhythm-final-bar-line/Jmaanes') assert game_psnprofiles_id_from_url(
assert game_psnprofiles_id_from_url('/trophy/21045-theatrhythm-final-bar-line/19-seasoned-hunter') '/trophies/21045-theatrhythm-final-bar-line/Jmaanes',
)
assert game_psnprofiles_id_from_url(
'/trophy/21045-theatrhythm-final-bar-line/19-seasoned-hunter',
)
@dataclasses.dataclass(frozen = True)
@dataclasses.dataclass(frozen=True)
class PsnProfilesScraper(Scraper): class PsnProfilesScraper(Scraper):
dataset_name = 'games_played_playstation' dataset_name = 'games_played_playstation'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
@ -38,7 +45,7 @@ class PsnProfilesScraper(Scraper):
def scrape(self): def scrape(self):
# Request to get watch history # Request to get watch history
logger.info('Getting Watchlist') logger.info('Getting Watchlist')
url = URL_PROFILE.format(psn_id = secrets.PLAYSTATION_PSN_ID) url = URL_PROFILE.format(psn_id=secrets.PLAYSTATION_PSN_ID)
response = self.session.get(url) response = self.session.get(url)
response.raise_for_status() response.raise_for_status()
@ -46,7 +53,7 @@ class PsnProfilesScraper(Scraper):
# Parse data # Parse data
soup = bs4.BeautifulSoup(response.content, 'lxml') soup = bs4.BeautifulSoup(response.content, 'lxml')
soup = personal_data.html_util.normalize_soup_slightly(soup, classes = False) soup = personal_data.html_util.normalize_soup_slightly(soup, classes=False)
# Recent trophies. # Recent trophies.
soup_recent_tropies = soup.select('ul#recent-trophies > li') soup_recent_tropies = soup.select('ul#recent-trophies > li')
@ -61,14 +68,15 @@ class PsnProfilesScraper(Scraper):
psnprofiles_id = game_psnprofiles_id_from_url(cells[0].find('a')['href']) psnprofiles_id = game_psnprofiles_id_from_url(cells[0].find('a')['href'])
trophy_icon = row.find(class_='icon').find('img')['src'] trophy_icon = row.find(class_='icon').find('img')['src']
gotten_at = cells[2].get_text().strip().removesuffix(' in').removesuffix(' ago') gotten_at = (
cells[2].get_text().strip().removesuffix(' in').removesuffix(' ago')
)
gotten_at = personal_data.parse_util.parse_duration(gotten_at) gotten_at = personal_data.parse_util.parse_duration(gotten_at)
time_acquired = NOW - gotten_at time_acquired = NOW - gotten_at
yield { yield {
'game.name' : game_name, 'game.name': game_name,
'me.last_played_time': time_acquired.date(), 'me.last_played_time': time_acquired.date(),
# Trophy Data # Trophy Data
'trophy.name': trophy_name, 'trophy.name': trophy_name,
'trophy.desc': trophy_desc, 'trophy.desc': trophy_desc,
@ -79,17 +87,21 @@ class PsnProfilesScraper(Scraper):
del row, cells, time_acquired del row, cells, time_acquired
# Games table # Games table
table_rows = soup.find(id = 'gamesTable').find_all('tr') table_rows = soup.find(id='gamesTable').find_all('tr')
assert len(table_rows) > 0, url assert len(table_rows) > 0, url
for row in table_rows: for row in table_rows:
cells = row.find_all('td') cells = row.find_all('td')
# Check for pagination # Check for pagination
if re.match(r'show \d+ more games', cells[0].get_text().strip(), re.IGNORECASE): if re.match(
r'show \d+ more games',
cells[0].get_text().strip(),
re.IGNORECASE,
):
break break
game_name = cells[1].find(class_ = 'title').get_text() game_name = cells[1].find(class_='title').get_text()
psnprofiles_id = game_psnprofiles_id_from_url(cells[0].find('a')['href']) psnprofiles_id = game_psnprofiles_id_from_url(cells[0].find('a')['href'])
game_icon = cells[0].find('img')['src'] game_icon = cells[0].find('img')['src']
@ -100,18 +112,20 @@ class PsnProfilesScraper(Scraper):
if len(small_infos) > 2: if len(small_infos) > 2:
time_played_div = small_infos[2] time_played_div = small_infos[2]
time_played_div.sup.extract() time_played_div.sup.extract()
time_played = datetime.datetime.strptime(time_played_div.get_text().strip(), FORMAT_DAY_MONTH_YEAR).date() time_played = datetime.datetime.strptime(
time_played_div.get_text().strip(),
FORMAT_DAY_MONTH_YEAR,
).date()
else: else:
time_played = None time_played = None
d = { d = {
# Important fields # Important fields
'game.name': game_name, 'game.name': game_name,
# Secondary fields
# Secondary fields 'game.platform': game_platform,
'game.platform': game_platform, 'game.icon': game_icon,
'game.icon': game_icon, 'psnprofiles.game_id': psnprofiles_id,
'psnprofiles.game_id': psnprofiles_id,
} }
if time_played: if time_played:
d['me.last_played_time'] = time_played d['me.last_played_time'] = time_played

View File

@ -1,10 +1,22 @@
import re import re
import bs4 import bs4
HTML_TAGS_MOSTLY_CONTENTLESS: set[str] = {'style', 'svg', 'link', 'br', 'math', HTML_TAGS_MOSTLY_CONTENTLESS: set[str] = {
'canvas'} 'style',
'svg',
'link',
'br',
'math',
'canvas',
}
HTML_TAGS_WITH_LITTLE_CONTENT: set[str] = {
'head',
'script',
'meta',
} | HTML_TAGS_MOSTLY_CONTENTLESS
HTML_TAGS_WITH_LITTLE_CONTENT: set[str] = { 'head', 'script', 'meta' } | HTML_TAGS_MOSTLY_CONTENTLESS
def normalize_text(text: str) -> str: def normalize_text(text: str) -> str:
text = text.replace('\t', ' ') text = text.replace('\t', ' ')
@ -15,6 +27,7 @@ def normalize_text(text: str) -> str:
text = re.sub(r'\s+$', '', text) text = re.sub(r'\s+$', '', text)
return text.encode('utf-8') return text.encode('utf-8')
def normalize_soup_bs4(soup: bs4.BeautifulSoup) -> bytes: def normalize_soup_bs4(soup: bs4.BeautifulSoup) -> bytes:
for comment in soup(text=lambda text: isinstance(text, bs4.Comment)): for comment in soup(text=lambda text: isinstance(text, bs4.Comment)):
comment.extract() comment.extract()
@ -26,6 +39,7 @@ def normalize_soup_bs4(soup: bs4.BeautifulSoup) -> bytes:
soup.smooth() soup.smooth()
return soup return soup
def normalize_soup_lxml(soup) -> bytes: def normalize_soup_lxml(soup) -> bytes:
for element_name in HTML_TAGS_WITH_LITTLE_CONTENT: for element_name in HTML_TAGS_WITH_LITTLE_CONTENT:
for script_elements in soup.cssselect(element_name): for script_elements in soup.cssselect(element_name):
@ -34,6 +48,7 @@ def normalize_soup_lxml(soup) -> bytes:
del element_name del element_name
return soup return soup
def normalize_soup(soup) -> bytes: def normalize_soup(soup) -> bytes:
text = None text = None
if isinstance(soup, bs4.BeautifulSoup): if isinstance(soup, bs4.BeautifulSoup):
@ -42,7 +57,8 @@ def normalize_soup(soup) -> bytes:
text = normalize_soup_lxml(soup).text_content() text = normalize_soup_lxml(soup).text_content()
return normalize_text(text) return normalize_text(text)
def normalize_soup_slightly(soup, classes = True, scripts = True, comments = True):
def normalize_soup_slightly(soup, classes=True, scripts=True, comments=True):
# Little if any content # Little if any content
for tag in HTML_TAGS_MOSTLY_CONTENTLESS: for tag in HTML_TAGS_MOSTLY_CONTENTLESS:
for e in soup.select(tag): for e in soup.select(tag):

View File

@ -1,30 +1,31 @@
import datetime import datetime
DATETIME_UNITS = { DATETIME_UNITS = {
'second': datetime.timedelta(seconds = 1), 'second': datetime.timedelta(seconds=1),
'seconds': datetime.timedelta(seconds = 1), 'seconds': datetime.timedelta(seconds=1),
'minute': datetime.timedelta(minutes = 1), 'minute': datetime.timedelta(minutes=1),
'minutes': datetime.timedelta(minutes = 1), 'minutes': datetime.timedelta(minutes=1),
'hour': datetime.timedelta(hours = 1), 'hour': datetime.timedelta(hours=1),
'hours': datetime.timedelta(hours = 1), 'hours': datetime.timedelta(hours=1),
'day': datetime.timedelta(days = 1), 'day': datetime.timedelta(days=1),
'days': datetime.timedelta(days = 1), 'days': datetime.timedelta(days=1),
'week': datetime.timedelta(days = 7), 'week': datetime.timedelta(days=7),
'weeks': datetime.timedelta(days = 7), 'weeks': datetime.timedelta(days=7),
'month': datetime.timedelta(days = 30), 'month': datetime.timedelta(days=30),
'months': datetime.timedelta(days = 30), 'months': datetime.timedelta(days=30),
'year': datetime.timedelta(days = 365), 'year': datetime.timedelta(days=365),
'years': datetime.timedelta(days = 365), 'years': datetime.timedelta(days=365),
} }
FORMAT_DATE_HEADER = '%a, %d %b %Y %H:%M:%S GMT' FORMAT_DATE_HEADER = '%a, %d %b %Y %H:%M:%S GMT'
def parse_duration(text: str) -> datetime.timedelta: def parse_duration(text: str) -> datetime.timedelta:
(num, unit) = text.split(' ') (num, unit) = text.split(' ')
num = int(num) num = int(num)
unit = DATETIME_UNITS[unit] unit = DATETIME_UNITS[unit]
return unit * num return unit * num
def response_datetime(response) -> datetime.datetime: def response_datetime(response) -> datetime.datetime:
return datetime.datetime.strptime(response.headers['Date'], FORMAT_DATE_HEADER) return datetime.datetime.strptime(response.headers['Date'], FORMAT_DATE_HEADER)

View File

@ -1,6 +1,2 @@
import personal_data.data
def test(): def test():
assert True assert True