1
0

Compare commits

..

No commits in common. "96a2e2bed96e922a87612385fe52bba97799be98" and "b67089f9111991ca956f0cbc2e6c7d975cee3bd9" have entirely different histories.

8 changed files with 39 additions and 49 deletions

View File

@ -19,7 +19,7 @@ URL_PROFILE_MOUNTS = (
) )
FFXIV_ARR_NAME = 'Final Fantasy XIV: A Realm Reborn' FFXIV_ARR_NAME = 'Final Fantasy XIV: A Realm Reborn'
FFXIV_ARR_RELEASE_DATE = datetime.date(2013, 8, 27) FFXIV_ARR_RELEASE_DATE = datetime.date(2013,8,27)
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)

View File

@ -52,9 +52,7 @@ class SteamAchievement(Scraper):
yield appid yield appid
def _scrape_app_achievements( def _scrape_app_achievements(
self, self, username: str, appid: int,
username: str,
appid: int,
) -> Iterator[dict[str, Any]]: ) -> Iterator[dict[str, Any]]:
url = URL_GAME_ACHIVEMENTS.format( url = URL_GAME_ACHIVEMENTS.format(
username=username, username=username,

View File

@ -21,9 +21,9 @@ FORMAT_DATE_HEADER = '%a, %d %b %Y %H:%M:%S GMT'
def parse_duration(text: str) -> datetime.timedelta: def parse_duration(text: str) -> datetime.timedelta:
(num_str, unit_str) = text.split(' ') (num, unit) = text.split(' ')
num = int(num_str) num = int(num)
unit = DATETIME_UNITS[unit_str] unit = DATETIME_UNITS[unit]
return unit * num return unit * num
@ -57,16 +57,12 @@ def parse_time(text: str) -> datetime.datetime:
if time is None and (m := try_parse(text, '%d %b @ %I:%M%p')): if time is None and (m := try_parse(text, '%d %b @ %I:%M%p')):
time = m.replace(year=NOW.year) time = m.replace(year=NOW.year)
if time is None: assert time is not None, 'Could not parse format'
msg = 'Unknown format: ' + text
raise RuntimeError(msg)
if time.tzinfo is None: if time.tzinfo is None:
time = time.replace(tzinfo=LOCAL_TIMEZONE) time = time.replace(tzinfo=LOCAL_TIMEZONE)
if time.tzinfo is None: assert time.tzinfo is not None, time
msg = 'Could not parse timezone: ' + text
raise RuntimeError(msg)
return time return time
@ -78,5 +74,4 @@ def parse_date(text: str) -> datetime.date:
return dt.date() return dt.date()
if dt := try_parse(text, '%B %d, %Y'): if dt := try_parse(text, '%B %d, %Y'):
return dt.date() return dt.date()
msg = 'Unknown format: ' + text assert False, text
raise RuntimeError(msg)

View File

@ -3,8 +3,9 @@ import csv
import datetime import datetime
import io import io
import logging import logging
import typing
import urllib.parse import urllib.parse
from collections.abc import Iterable, Mapping from collections.abc import Iterable, Mapping, Sequence
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -19,14 +20,13 @@ def csv_safe_value(v: Any) -> str:
if isinstance(v, urllib.parse.ParseResult): if isinstance(v, urllib.parse.ParseResult):
return v.geturl() return v.geturl()
if isinstance(v, datetime.datetime): if isinstance(v, datetime.datetime):
if v.tzinfo is None: assert v.tzinfo is not None, v
raise RuntimeError(v)
return str(v) return str(v)
def equals_without_fields( def equals_without_fields(
a: Mapping[str, Any], a: Mapping[str, object],
b: Mapping[str, Any], b: Mapping[str, object],
fields: Iterable[str] = frozenset(), fields: Iterable[str] = frozenset(),
) -> bool: ) -> bool:
a = dict(a) a = dict(a)
@ -39,13 +39,14 @@ def equals_without_fields(
def deduplicate_by_ignoring_certain_fields( def deduplicate_by_ignoring_certain_fields(
dicts: list[frozendict[str, Any]], dicts: list[dict],
deduplicate_ignore_columns: Iterable[str], deduplicate_ignore_columns: Iterable[str],
) -> list[frozendict[str, Any]]: ) -> list[dict]:
"""Removes duplicates that occur when ignoring certain columns. """Removes duplicates that occur when ignoring certain columns.
Output order is stable. Output order is stable.
""" """
to_remove = set() to_remove = set()
for idx1, first in enumerate(dicts): for idx1, first in enumerate(dicts):
for idx2, second in enumerate(dicts[idx1 + 1 :], idx1 + 1): for idx2, second in enumerate(dicts[idx1 + 1 :], idx1 + 1):
@ -54,21 +55,19 @@ def deduplicate_by_ignoring_certain_fields(
del idx2, second del idx2, second
del idx1, first del idx1, first
to_remove_ls = sorted(to_remove) to_remove = sorted(to_remove)
del to_remove while to_remove:
while to_remove_ls: del dicts[to_remove.pop()]
del dicts[to_remove_ls.pop()]
return dicts return dicts
def deduplicate_dicts( def deduplicate_dicts(
dicts: list[frozendict[str, Any]], dicts: Sequence[dict[str, typing.Any] | frozendict[str, typing.Any]],
deduplicate_mode: data.DeduplicateMode, deduplicate_mode: data.DeduplicateMode,
deduplicate_ignore_columns: list[str], deduplicate_ignore_columns: list[str],
) -> tuple[list[frozendict[str, Any]], list[str]]: ) -> tuple[Sequence[dict[str, typing.Any]], list[str]]:
if not isinstance(deduplicate_ignore_columns, list): assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
raise TypeError(deduplicate_ignore_columns)
fieldnames = [] fieldnames = []
for d in dicts: for d in dicts:
@ -79,7 +78,7 @@ def deduplicate_dicts(
del d del d
if deduplicate_mode == data.DeduplicateMode.ONLY_LATEST: if deduplicate_mode == data.DeduplicateMode.ONLY_LATEST:
while len(dicts) > 1 and equals_without_fields( while len(dicts) >= 2 and equals_without_fields(
dicts[-1], dicts[-1],
dicts[-2], dicts[-2],
deduplicate_ignore_columns, deduplicate_ignore_columns,
@ -91,13 +90,13 @@ def deduplicate_dicts(
deduplicate_ignore_columns, deduplicate_ignore_columns,
) )
elif deduplicate_mode != data.DeduplicateMode.NONE: elif deduplicate_mode != data.DeduplicateMode.NONE:
dicts = list(set(dicts)) dicts = set(dicts)
dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames)) dicts = sorted(dicts, key=lambda d: tuple(str(d.get(fn, '')) for fn in fieldnames))
return dicts, fieldnames return dicts, fieldnames
def normalize_dict(d: dict[str, Any] | frozendict[str, Any]) -> frozendict[str, Any]: def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]:
return frozendict( return frozendict(
{ {
k: csv_import.csv_str_to_value(str(v)) k: csv_import.csv_str_to_value(str(v))
@ -109,23 +108,20 @@ def normalize_dict(d: dict[str, Any] | frozendict[str, Any]) -> frozendict[str,
def extend_csv_file( def extend_csv_file(
csv_file: Path, csv_file: Path,
new_dicts: list[dict[str, Any] | frozendict[str, Any]], new_dicts: list[dict[str, typing.Any]],
deduplicate_mode: data.DeduplicateMode, deduplicate_mode: data.DeduplicateMode,
deduplicate_ignore_columns: list[str], deduplicate_ignore_columns: list[str],
) -> dict: ) -> dict:
if not isinstance(deduplicate_ignore_columns, list): assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
raise TypeError(deduplicate_ignore_columns)
try: try:
original_dicts = csv_import.load_csv_file(csv_file) dicts = csv_import.load_csv_file(csv_file)
except (FileNotFoundError, _csv.Error): except (FileNotFoundError, _csv.Error) as e:
logger.info('Creating file: %s', csv_file) logger.info('Creating file: %s', csv_file)
original_dicts = [] dicts = []
original_num_dicts = len(original_dicts) original_num_dicts = len(dicts)
dicts = [normalize_dict(d) for d in original_dicts] + [ dicts += [normalize_dict(d) for d in new_dicts]
normalize_dict(d) for d in new_dicts
]
del new_dicts del new_dicts
dicts, fieldnames = deduplicate_dicts( dicts, fieldnames = deduplicate_dicts(

View File

@ -39,7 +39,7 @@ def test_all_fields():
] ]
def test_all_fields_for_duplicated_list(): def test_all_fields():
ls, fields = deduplicate_dicts(LIST + LIST, DeduplicateMode.BY_ALL_COLUMNS, ['t']) ls, fields = deduplicate_dicts(LIST + LIST, DeduplicateMode.BY_ALL_COLUMNS, ['t'])
assert fields == ['a', 'b', 't'] assert fields == ['a', 'b', 't']
print(ls) print(ls)

View File

@ -54,6 +54,6 @@ PARSE_MAPPINGS = [
] ]
@pytest.mark.parametrize(('text', 'parsed'), PARSE_MAPPINGS) @pytest.mark.parametrize('text,parsed', PARSE_MAPPINGS)
def test_csv_str_to_value(text: str, parsed: object): def test_csv_str_to_value(text, parsed):
assert csv_str_to_value(text) == parsed, text assert csv_str_to_value(text) == parsed, text

View File

@ -10,6 +10,6 @@ URLS_AND_IDS = [
] ]
@pytest.mark.parametrize(('psnprofiles_id', 'url'), URLS_AND_IDS) @pytest.mark.parametrize('id, url', URLS_AND_IDS)
def test_game_psnprofiles_id_from_url(psnprofiles_id: int, url: str): def test_game_psnprofiles_id_from_url(id, url):
assert psnprofiles.game_psnprofiles_id_from_url(url) == psnprofiles_id assert psnprofiles.game_psnprofiles_id_from_url(url) == id

View File

@ -2,4 +2,5 @@ import personal_data
def test_version(): def test_version():
assert personal_data._version.__version__ is not None
assert personal_data.__version__ is not None assert personal_data.__version__ is not None