Compare commits
4 Commits
167b2c8f27
...
51d49bf9f7
Author | SHA1 | Date | |
---|---|---|---|
51d49bf9f7 | |||
28e18fc1b4 | |||
72804d145f | |||
6bdd778028 |
|
@ -10,11 +10,13 @@ from personal_data import secrets
|
||||||
from personal_data.data import DeduplicateMode, Scraper
|
from personal_data.data import DeduplicateMode, Scraper
|
||||||
|
|
||||||
from .. import parse_util
|
from .. import parse_util
|
||||||
|
import requests_util
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
URL_PROFILE = 'https://psnprofiles.com/{psn_id}'
|
URL_API_ROOT = 'https://psnprofiles.com/'
|
||||||
URL_USER_GAME_TROPHIES = 'https://psnprofiles.com/trophies/{game_id}/{psn_id}'
|
URL_PROFILE = URL_API_ROOT + '{psn_id}'
|
||||||
|
URL_USER_GAME_TROPHIES = URL_API_ROOT + 'trophies/{game_id}/{psn_id}'
|
||||||
|
|
||||||
|
|
||||||
def game_psnprofiles_id_from_url(relative_url: str) -> int:
|
def game_psnprofiles_id_from_url(relative_url: str) -> int:
|
||||||
|
@ -23,8 +25,7 @@ def game_psnprofiles_id_from_url(relative_url: str) -> int:
|
||||||
return int(result)
|
return int(result)
|
||||||
|
|
||||||
|
|
||||||
MAX_GAME_ITERATIONS = 10
|
MAX_NUMBER_GAMES_TO_PARSE = 1000
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(frozen=True)
|
@dataclasses.dataclass(frozen=True)
|
||||||
class PsnProfilesScraper(Scraper):
|
class PsnProfilesScraper(Scraper):
|
||||||
|
@ -36,19 +37,30 @@ class PsnProfilesScraper(Scraper):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def scrape(self):
|
def scrape(self):
|
||||||
games_rows = list(self.scrape_games_overview())
|
self._setup_cache()
|
||||||
|
games_rows = list(self._scrape_games_overview())
|
||||||
games_ids = {row['psnprofiles.game_id']: row['game.name'] for row in games_rows}
|
games_ids = {row['psnprofiles.game_id']: row['game.name'] for row in games_rows}
|
||||||
|
|
||||||
|
SCRAPE_FROM_OVERVIEW = False
|
||||||
|
if SCRAPE_FROM_OVERVIEW:
|
||||||
yield from games_rows
|
yield from games_rows
|
||||||
|
|
||||||
idx = 0
|
idx = 0
|
||||||
for game_id, game_name in games_ids.items():
|
for game_id, game_name in games_ids.items():
|
||||||
yield from self.scrape_game_trophies(game_id, game_name)
|
yield from self._scrape_game_trophies(game_id, game_name)
|
||||||
del game_id
|
del game_id
|
||||||
idx += 1
|
idx += 1
|
||||||
if idx >= MAX_GAME_ITERATIONS:
|
if idx >= MAX_NUMBER_GAMES_TO_PARSE:
|
||||||
break
|
break
|
||||||
|
|
||||||
def scrape_games_overview(self) -> Iterator[dict]:
|
def _setup_cache(self):
|
||||||
|
requests_util.setup_limiter(
|
||||||
|
self.session,
|
||||||
|
URL_API_ROOT,
|
||||||
|
per_minute = 5,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _scrape_games_overview(self) -> Iterator[dict]:
|
||||||
# Request to get overview
|
# Request to get overview
|
||||||
logger.info('Getting Overview')
|
logger.info('Getting Overview')
|
||||||
url = URL_PROFILE.format(psn_id=secrets.PLAYSTATION_PSN_ID)
|
url = URL_PROFILE.format(psn_id=secrets.PLAYSTATION_PSN_ID)
|
||||||
|
@ -136,7 +148,7 @@ class PsnProfilesScraper(Scraper):
|
||||||
d['me.last_played_time'] = time_played
|
d['me.last_played_time'] = time_played
|
||||||
yield d
|
yield d
|
||||||
|
|
||||||
def scrape_game_trophies(
|
def _scrape_game_trophies(
|
||||||
self,
|
self,
|
||||||
psnprofiles_id: int,
|
psnprofiles_id: int,
|
||||||
game_name: str,
|
game_name: str,
|
||||||
|
|
|
@ -12,10 +12,11 @@ import bs4
|
||||||
import personal_data.html_util
|
import personal_data.html_util
|
||||||
import personal_data.parse_util
|
import personal_data.parse_util
|
||||||
from personal_data.data import DeduplicateMode, Scraper
|
from personal_data.data import DeduplicateMode, Scraper
|
||||||
|
import requests_util
|
||||||
|
|
||||||
|
URL_API_ROOT = 'https://tavex.dk/'
|
||||||
|
|
||||||
def parse_dkk_price(dkk: str) -> Decimal:
|
def parse_dkk_price(dkk: str) -> Decimal:
|
||||||
print(dkk)
|
|
||||||
if dkk.strip() == '-':
|
if dkk.strip() == '-':
|
||||||
return None
|
return None
|
||||||
return Decimal(dkk.removesuffix(' DKK').replace(',', '.'))
|
return Decimal(dkk.removesuffix(' DKK').replace(',', '.'))
|
||||||
|
@ -33,7 +34,15 @@ class TavexScraperBase(Scraper):
|
||||||
def page_url() -> str:
|
def page_url() -> str:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def _setup_cache(self):
|
||||||
|
requests_util.setup_limiter(
|
||||||
|
self.session,
|
||||||
|
URL_API_ROOT,
|
||||||
|
per_minute = 5,
|
||||||
|
)
|
||||||
|
|
||||||
def scrape(self):
|
def scrape(self):
|
||||||
|
self._setup_cache()
|
||||||
response = self.session.get(self.page_url())
|
response = self.session.get(self.page_url())
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
|
||||||
|
@ -77,7 +86,7 @@ class TavexScraperGold(TavexScraperBase):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def page_url() -> str:
|
def page_url() -> str:
|
||||||
return 'https://tavex.dk/guld/1oz-canadisk-maple-leaf-guldmont/'
|
return f'{URL_API_ROOT}/guld/1oz-canadisk-maple-leaf-guldmont/'
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(frozen=True)
|
@dataclasses.dataclass(frozen=True)
|
||||||
|
@ -87,4 +96,4 @@ class TavexScraperSilver(TavexScraperBase):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def page_url() -> str:
|
def page_url() -> str:
|
||||||
return 'https://tavex.dk/solv/1-oz-american-eagle-solvmont-tidligere-argange/'
|
return f'{URL_API_ROOT}/solv/1-oz-american-eagle-solvmont-tidligere-argange/'
|
||||||
|
|
|
@ -43,6 +43,7 @@ if cfscrape:
|
||||||
class CachedCfScrape(requests_cache.CacheMixin, cfscrape.CloudflareScraper):
|
class CachedCfScrape(requests_cache.CacheMixin, cfscrape.CloudflareScraper):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
CACHE_EXPIRE_DEFAULT = datetime.timedelta(days=7)
|
||||||
|
|
||||||
def get_session(
|
def get_session(
|
||||||
cookiejar: Sequence,
|
cookiejar: Sequence,
|
||||||
|
@ -57,7 +58,7 @@ def get_session(
|
||||||
return requests.Session()
|
return requests.Session()
|
||||||
if cfscrape:
|
if cfscrape:
|
||||||
session_class = CachedCfScrape
|
session_class = CachedCfScrape
|
||||||
session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=datetime.timedelta(days=1))
|
session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=CACHE_EXPIRE_DEFAULT)
|
||||||
for cookie in cookiejar:
|
for cookie in cookiejar:
|
||||||
session.cookies.set_cookie(cookie)
|
session.cookies.set_cookie(cookie)
|
||||||
return session
|
return session
|
||||||
|
|
|
@ -58,6 +58,7 @@ def csv_str_to_value(
|
||||||
return None
|
return None
|
||||||
return s
|
return s
|
||||||
|
|
||||||
|
|
||||||
def csv_safe_value(v: object) -> str:
|
def csv_safe_value(v: object) -> str:
|
||||||
if isinstance(v, urllib.parse.ParseResult):
|
if isinstance(v, urllib.parse.ParseResult):
|
||||||
return v.geturl()
|
return v.geturl()
|
||||||
|
@ -91,6 +92,8 @@ def deduplicate_by_ignoring_certain_fields(
|
||||||
for idx2, second in enumerate(dicts[idx1 + 1 :], idx1 + 1):
|
for idx2, second in enumerate(dicts[idx1 + 1 :], idx1 + 1):
|
||||||
if equals_without_fields(first, second, deduplicate_ignore_columns):
|
if equals_without_fields(first, second, deduplicate_ignore_columns):
|
||||||
to_remove.add(idx2)
|
to_remove.add(idx2)
|
||||||
|
del idx2, second
|
||||||
|
del idx1, first
|
||||||
|
|
||||||
to_remove = sorted(to_remove)
|
to_remove = sorted(to_remove)
|
||||||
while to_remove:
|
while to_remove:
|
||||||
|
@ -100,10 +103,10 @@ def deduplicate_by_ignoring_certain_fields(
|
||||||
|
|
||||||
|
|
||||||
def deduplicate_dicts(
|
def deduplicate_dicts(
|
||||||
dicts: Sequence[dict],
|
dicts: Sequence[dict[str,typing.Any] | frozendict[str,typing.Any]],
|
||||||
deduplicate_mode: data.DeduplicateMode,
|
deduplicate_mode: data.DeduplicateMode,
|
||||||
deduplicate_ignore_columns: list[str],
|
deduplicate_ignore_columns: list[str],
|
||||||
) -> tuple[Sequence[dict], list[str]]:
|
) -> tuple[Sequence[dict[str,typing.Any]], list[str]]:
|
||||||
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
|
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
|
||||||
|
|
||||||
fieldnames = []
|
fieldnames = []
|
||||||
|
@ -133,7 +136,7 @@ def deduplicate_dicts(
|
||||||
return dicts, fieldnames
|
return dicts, fieldnames
|
||||||
|
|
||||||
|
|
||||||
def normalize_dict(d: dict) -> frozendict:
|
def normalize_dict(d: dict[str,typing.Any]) -> frozendict[str,typing.Any]:
|
||||||
return frozendict(
|
return frozendict(
|
||||||
{k: csv_str_to_value(str(v)) for k, v in d.items() if csv_str_to_value(str(v)) is not None},
|
{k: csv_str_to_value(str(v)) for k, v in d.items() if csv_str_to_value(str(v)) is not None},
|
||||||
)
|
)
|
||||||
|
|
|
@ -17,12 +17,13 @@ def parse_arguments():
|
||||||
|
|
||||||
|
|
||||||
def generate_calendar(rows: list[dict]) -> icalendar.Calendar:
|
def generate_calendar(rows: list[dict]) -> icalendar.Calendar:
|
||||||
|
max_title_parts = 2
|
||||||
|
|
||||||
cal = icalendar.Calendar()
|
cal = icalendar.Calendar()
|
||||||
cal.add('prodid', '-//personal_data_calendar//example.org//')
|
cal.add('prodid', '-//personal_data_calendar//example.org//')
|
||||||
cal.add('version', '2.0')
|
cal.add('version', '2.0')
|
||||||
|
|
||||||
for event_data in rows:
|
for event_data in rows:
|
||||||
print(event_data)
|
|
||||||
|
|
||||||
# Select data
|
# Select data
|
||||||
possible_time_keys = [
|
possible_time_keys = [
|
||||||
|
@ -34,20 +35,20 @@ def generate_calendar(rows: list[dict]) -> icalendar.Calendar:
|
||||||
]
|
]
|
||||||
|
|
||||||
date = event_data[possible_time_keys[0]] if possible_time_keys else None
|
date = event_data[possible_time_keys[0]] if possible_time_keys else None
|
||||||
title = event_data[possible_name_keys[0]]
|
|
||||||
image = event_data[possible_image_keys[0]] if possible_image_keys else None
|
image = event_data[possible_image_keys[0]] if possible_image_keys else None
|
||||||
|
|
||||||
if date is None:
|
if date is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
description = '\n\n'.join(event_data[k] for k in possible_name_keys)
|
title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts])
|
||||||
|
description = '\n\n'.join(event_data[k] for k in possible_name_keys[max_title_parts:])
|
||||||
|
|
||||||
# Create event
|
# Create event
|
||||||
event = icalendar.Event()
|
event = icalendar.Event()
|
||||||
event.add('summary', title)
|
event.add('summary', title)
|
||||||
event.add('description', description)
|
event.add('description', description)
|
||||||
event.add('dtstart', date)
|
event.add('dtstart', date)
|
||||||
event.add('dtend', date)
|
event.add('dtend', date + datetime.timedelta(minutes=30))
|
||||||
event.add('created', NOW)
|
event.add('created', NOW)
|
||||||
event.add('dtstamp', NOW)
|
event.add('dtstamp', NOW)
|
||||||
if image:
|
if image:
|
||||||
|
@ -62,7 +63,6 @@ def main():
|
||||||
args = parse_arguments()
|
args = parse_arguments()
|
||||||
|
|
||||||
dicts = load_csv_file(args.data_folder + '/games_played_playstation.csv')
|
dicts = load_csv_file(args.data_folder + '/games_played_playstation.csv')
|
||||||
print(dicts)
|
|
||||||
|
|
||||||
calendar = generate_calendar(dicts)
|
calendar = generate_calendar(dicts)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user