import csv import datetime import io import logging import browsercookie import requests import requests_cache from frozendict import frozendict try: import cfscrape except Exception: cfscrape = None logger = logging.getLogger(__name__) import personal_data.data import personal_data.fetchers.crunchyroll import personal_data.fetchers.ffxiv_lodestone import personal_data.fetchers.playstation import personal_data.fetchers.psnprofiles from personal_data._version import __version__ CSV_DIALECT = 'one_true_dialect' csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True) logging.basicConfig() logger.setLevel('INFO') def try_value(fn, s: str) -> any: try: return fn(s) except ValueError: return None def to_value(s: str) -> any: s = s.strip() if len(s) == 0: return None if v := try_value(int, s): return v if v := try_value(datetime.date.fromisoformat, s): return v if v := try_value(datetime.datetime.fromisoformat, s): return v if s.lower() == 'false': return False if s.lower() == 'true': return True if s.lower() == 'none': return None return s def extend_csv_file( filename: str, new_dicts: dict, deduplicate_mode: personal_data.data.DeduplicateMode, ): dicts = [] try: with open(filename, 'r') as csvfile: reader = csv.DictReader(csvfile, dialect=CSV_DIALECT) for row in reader: for k in list(row.keys()): row[k] = to_value(row[k]) if row[k] is None: del row[k] dicts.append(frozendict(row)) del csvfile except FileNotFoundError as e: logger.info('Creating file: %s', filename) pass original_num_dicts = len(dicts) dicts += [frozendict(d) for d in new_dicts] del new_dicts fieldnames = [] for d in dicts: for k in d.keys(): if k not in fieldnames: fieldnames.append(k) del k del d if deduplicate_mode != personal_data.data.DeduplicateMode.NONE: dicts = set(dicts) dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames)) csvfile_in_memory = io.StringIO() writer = csv.DictWriter( csvfile_in_memory, fieldnames=fieldnames, dialect=CSV_DIALECT, ) writer.writeheader() for d in dicts: writer.writerow(d) output_csv = csvfile_in_memory.getvalue() del writer, csvfile_in_memory with open(filename, 'w') as csvfile: csvfile.write(output_csv) del csvfile logger.warning( 'Extended CSV "%s" from %d to %d lines', filename, original_num_dicts, len(dicts), ) STANDARD_HEADERS = { 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0', # "Accept": "application/json, text/plain, */*", 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate, br', } class CachedCfScrape(requests_cache.CacheMixin, cfscrape.CloudflareScraper): pass def get_session(cookiejar, *, with_cfscrape: bool) -> requests.Session: assert isinstance(with_cfscrape, bool) session = CachedCfScrape('web_cache', cookies=cookiejar) for cookie in cookiejar: session.cookies.set_cookie(cookie) return session def main(): cookiejar = browsercookie.firefox() logger.warning('Got cookiejar from firefox: %s cookies', len(cookiejar)) scraper_filter = {'PsnProfilesScraper'} for scraper_cls in personal_data.data.Scraper.__subclasses__(): session = get_session(cookiejar, with_cfscrape=scraper_cls.requires_cfscrape()) scraper = scraper_cls(session) if scraper_cls.__name__ not in scraper_filter: continue logger.warning( 'Running %s, appending to "%s"', scraper_cls.__name__, scraper.dataset_name, ) result_rows = list() try: for result in scraper.scrape(): result_rows.append(result) del result except requests.exceptions.HTTPError: logger.exception('Failed in running %s', scraper_cls.__name__) continue extend_csv_file( 'output/' + scraper.dataset_name, result_rows, deduplicate_mode=scraper.deduplicate_mode, ) logger.warning('Scraper done: %s', scraper.dataset_name) del scraper, session