1
0

Compare commits

..

4 Commits

Author SHA1 Message Date
51d49bf9f7
More games
Some checks failed
Test Python / Test (push) Failing after 29s
2024-08-25 23:12:33 +02:00
28e18fc1b4
Fixed title and description 2024-08-25 22:15:13 +02:00
72804d145f
Additional types 2024-08-25 21:53:18 +02:00
6bdd778028
Improved caching situation 2024-08-25 21:18:55 +02:00
5 changed files with 47 additions and 22 deletions

View File

@ -10,11 +10,13 @@ from personal_data import secrets
from personal_data.data import DeduplicateMode, Scraper
from .. import parse_util
import requests_util
logger = logging.getLogger(__name__)
URL_PROFILE = 'https://psnprofiles.com/{psn_id}'
URL_USER_GAME_TROPHIES = 'https://psnprofiles.com/trophies/{game_id}/{psn_id}'
URL_API_ROOT = 'https://psnprofiles.com/'
URL_PROFILE = URL_API_ROOT + '{psn_id}'
URL_USER_GAME_TROPHIES = URL_API_ROOT + 'trophies/{game_id}/{psn_id}'
def game_psnprofiles_id_from_url(relative_url: str) -> int:
@ -23,8 +25,7 @@ def game_psnprofiles_id_from_url(relative_url: str) -> int:
return int(result)
MAX_GAME_ITERATIONS = 10
MAX_NUMBER_GAMES_TO_PARSE = 1000
@dataclasses.dataclass(frozen=True)
class PsnProfilesScraper(Scraper):
@ -36,19 +37,30 @@ class PsnProfilesScraper(Scraper):
return True
def scrape(self):
games_rows = list(self.scrape_games_overview())
self._setup_cache()
games_rows = list(self._scrape_games_overview())
games_ids = {row['psnprofiles.game_id']: row['game.name'] for row in games_rows}
yield from games_rows
SCRAPE_FROM_OVERVIEW = False
if SCRAPE_FROM_OVERVIEW:
yield from games_rows
idx = 0
for game_id, game_name in games_ids.items():
yield from self.scrape_game_trophies(game_id, game_name)
yield from self._scrape_game_trophies(game_id, game_name)
del game_id
idx += 1
if idx >= MAX_GAME_ITERATIONS:
if idx >= MAX_NUMBER_GAMES_TO_PARSE:
break
def scrape_games_overview(self) -> Iterator[dict]:
def _setup_cache(self):
requests_util.setup_limiter(
self.session,
URL_API_ROOT,
per_minute = 5,
)
def _scrape_games_overview(self) -> Iterator[dict]:
# Request to get overview
logger.info('Getting Overview')
url = URL_PROFILE.format(psn_id=secrets.PLAYSTATION_PSN_ID)
@ -136,7 +148,7 @@ class PsnProfilesScraper(Scraper):
d['me.last_played_time'] = time_played
yield d
def scrape_game_trophies(
def _scrape_game_trophies(
self,
psnprofiles_id: int,
game_name: str,

View File

@ -12,10 +12,11 @@ import bs4
import personal_data.html_util
import personal_data.parse_util
from personal_data.data import DeduplicateMode, Scraper
import requests_util
URL_API_ROOT = 'https://tavex.dk/'
def parse_dkk_price(dkk: str) -> Decimal:
print(dkk)
if dkk.strip() == '-':
return None
return Decimal(dkk.removesuffix(' DKK').replace(',', '.'))
@ -33,7 +34,15 @@ class TavexScraperBase(Scraper):
def page_url() -> str:
pass
def _setup_cache(self):
requests_util.setup_limiter(
self.session,
URL_API_ROOT,
per_minute = 5,
)
def scrape(self):
self._setup_cache()
response = self.session.get(self.page_url())
response.raise_for_status()
@ -77,7 +86,7 @@ class TavexScraperGold(TavexScraperBase):
@staticmethod
def page_url() -> str:
return 'https://tavex.dk/guld/1oz-canadisk-maple-leaf-guldmont/'
return f'{URL_API_ROOT}/guld/1oz-canadisk-maple-leaf-guldmont/'
@dataclasses.dataclass(frozen=True)
@ -87,4 +96,4 @@ class TavexScraperSilver(TavexScraperBase):
@staticmethod
def page_url() -> str:
return 'https://tavex.dk/solv/1-oz-american-eagle-solvmont-tidligere-argange/'
return f'{URL_API_ROOT}/solv/1-oz-american-eagle-solvmont-tidligere-argange/'

View File

@ -43,6 +43,7 @@ if cfscrape:
class CachedCfScrape(requests_cache.CacheMixin, cfscrape.CloudflareScraper):
pass
CACHE_EXPIRE_DEFAULT = datetime.timedelta(days=7)
def get_session(
cookiejar: Sequence,
@ -57,7 +58,7 @@ def get_session(
return requests.Session()
if cfscrape:
session_class = CachedCfScrape
session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=datetime.timedelta(days=1))
session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=CACHE_EXPIRE_DEFAULT)
for cookie in cookiejar:
session.cookies.set_cookie(cookie)
return session

View File

@ -58,6 +58,7 @@ def csv_str_to_value(
return None
return s
def csv_safe_value(v: object) -> str:
if isinstance(v, urllib.parse.ParseResult):
return v.geturl()
@ -91,6 +92,8 @@ def deduplicate_by_ignoring_certain_fields(
for idx2, second in enumerate(dicts[idx1 + 1 :], idx1 + 1):
if equals_without_fields(first, second, deduplicate_ignore_columns):
to_remove.add(idx2)
del idx2, second
del idx1, first
to_remove = sorted(to_remove)
while to_remove:
@ -100,10 +103,10 @@ def deduplicate_by_ignoring_certain_fields(
def deduplicate_dicts(
dicts: Sequence[dict],
dicts: Sequence[dict[str,typing.Any] | frozendict[str,typing.Any]],
deduplicate_mode: data.DeduplicateMode,
deduplicate_ignore_columns: list[str],
) -> tuple[Sequence[dict], list[str]]:
) -> tuple[Sequence[dict[str,typing.Any]], list[str]]:
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
fieldnames = []
@ -133,7 +136,7 @@ def deduplicate_dicts(
return dicts, fieldnames
def normalize_dict(d: dict) -> frozendict:
def normalize_dict(d: dict[str,typing.Any]) -> frozendict[str,typing.Any]:
return frozendict(
{k: csv_str_to_value(str(v)) for k, v in d.items() if csv_str_to_value(str(v)) is not None},
)

View File

@ -17,12 +17,13 @@ def parse_arguments():
def generate_calendar(rows: list[dict]) -> icalendar.Calendar:
max_title_parts = 2
cal = icalendar.Calendar()
cal.add('prodid', '-//personal_data_calendar//example.org//')
cal.add('version', '2.0')
for event_data in rows:
print(event_data)
# Select data
possible_time_keys = [
@ -34,20 +35,20 @@ def generate_calendar(rows: list[dict]) -> icalendar.Calendar:
]
date = event_data[possible_time_keys[0]] if possible_time_keys else None
title = event_data[possible_name_keys[0]]
image = event_data[possible_image_keys[0]] if possible_image_keys else None
if date is None:
continue
description = '\n\n'.join(event_data[k] for k in possible_name_keys)
title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts])
description = '\n\n'.join(event_data[k] for k in possible_name_keys[max_title_parts:])
# Create event
event = icalendar.Event()
event.add('summary', title)
event.add('description', description)
event.add('dtstart', date)
event.add('dtend', date)
event.add('dtend', date + datetime.timedelta(minutes=30))
event.add('created', NOW)
event.add('dtstamp', NOW)
if image:
@ -62,7 +63,6 @@ def main():
args = parse_arguments()
dicts = load_csv_file(args.data_folder + '/games_played_playstation.csv')
print(dicts)
calendar = generate_calendar(dicts)