diff --git a/.gitignore b/.gitignore index 41180b8..3449270 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ __pycache__/ /.coverage /.hypothesis/ /htmlcov/ +.aider* diff --git a/git_time_tracker/format/cli.py b/git_time_tracker/format/cli.py index 0293c55..44605f7 100644 --- a/git_time_tracker/format/cli.py +++ b/git_time_tracker/format/cli.py @@ -35,7 +35,9 @@ def fmt_line(label: Label, total_time: datetime.timedelta) -> str: label_str = str(label.label) return f' {label.category:20} {label_str:50} {hours:-4d}h {minutes:-2d}m' -LINE_LENGTH = len(fmt_line(Label('',''), datetime.timedelta())) + +LINE_LENGTH = len(fmt_line(Label('', ''), datetime.timedelta())) + def generate_report( samples: list[RealizedActivitySample], diff --git a/obsidian_import/__init__.py b/obsidian_import/__init__.py index 925feb5..e1720ab 100644 --- a/obsidian_import/__init__.py +++ b/obsidian_import/__init__.py @@ -31,6 +31,7 @@ HOUR = datetime.timedelta(hours=1) MINUTE = datetime.timedelta(minutes=1) SECOND = datetime.timedelta(seconds=1) + def to_text_duration(duration: datetime.timedelta) -> str: hours = int(duration / HOUR) duration -= hours * HOUR @@ -129,6 +130,7 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int: return num_updated + def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int: num_updated = 0 @@ -141,22 +143,32 @@ def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int: COLUMNS = ['score.w1', 'score.w2', 'score.w3', 'score.w4', 'score.w5'] - def all_steps(row: dict[str,int]): + def all_steps(row: dict[str, int]): return sum(row[column] for column in COLUMNS) steps_per_date = { - date: sum(all_steps(row) for row in rows) for date, rows in rows_per_date.items() + date: sum(all_steps(row) for row in rows) + for date, rows in rows_per_date.items() } duration_per_date = { - date: sum((row['play.duration'] for row in rows), start=datetime.timedelta()) for date, rows in rows_per_date.items() + date: sum((row['play.duration'] for row in rows), start=datetime.timedelta()) + for date, rows in rows_per_date.items() } print(steps_per_date) print(duration_per_date) for date in steps_per_date: - was_updated_1 = vault.add_statistic(date, 'Stepmania (Steps)', int(steps_per_date[date])) - was_updated_2 = vault.add_statistic(date, 'Stepmania (Duration)', to_text_duration(duration_per_date[date])) + was_updated_1 = vault.add_statistic( + date, + 'Stepmania (Steps)', + int(steps_per_date[date]), + ) + was_updated_2 = vault.add_statistic( + date, + 'Stepmania (Duration)', + to_text_duration(duration_per_date[date]), + ) if was_updated_1 or was_updated_2: num_updated += 1 del date, was_updated_1, was_updated_2 @@ -247,27 +259,47 @@ def map_games_played_content(sample: RealizedActivitySample) -> EventContent: PATH_WATCHED = Path('output/show_episodes_watched.csv') PATH_PLAYED = Path('output/games_played.csv') -PATH_WORKOUT = Path('/home/jmaa/Notes/workout.csv') +PATH_WORKOUT = Path('/home/jmaa/Notes/workout.csv') PATH_STEP_COUNTS = Path( '/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv', ) -PATH_STEPMANIA = Path('output/stepmania.csv') +PATH_STEPMANIA = Path('output/stepmania.csv') IMPORTERS = [ - {'path': PATH_WORKOUT, 'import_rows': import_workout_csv}, - {'path': PATH_STEP_COUNTS, 'import_rows': import_step_counts_csv}, - {'path': PATH_STEPMANIA, 'import_rows': import_stepmania_steps_csv}, - {'path': PATH_PLAYED, 'import_rows': lambda vault, rows: import_activity_sample_csv(vault, rows, map_games_played_content, group_category='game.name',) }, - {'path': PATH_WATCHED, 'import_rows': lambda vault, rows: import_activity_sample_csv(vault, rows, map_watched_series_content) }, + {'path': PATH_WORKOUT, 'import_rows': import_workout_csv}, + {'path': PATH_STEP_COUNTS, 'import_rows': import_step_counts_csv}, + {'path': PATH_STEPMANIA, 'import_rows': import_stepmania_steps_csv}, + { + 'path': PATH_PLAYED, + 'import_rows': lambda vault, rows: import_activity_sample_csv( + vault, + rows, + map_games_played_content, + group_category='game.name', + ), + }, + { + 'path': PATH_WATCHED, + 'import_rows': lambda vault, rows: import_activity_sample_csv( + vault, + rows, + map_watched_series_content, + ), + }, ] + def import_data(obsidian_path: Path, dry_run=True): vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None) for import_def in IMPORTERS: if not import_def['path'].exists(): - logger.warning('Skipping %s: %s is missing', import_def['import_rows'], import_def['path']) + logger.warning( + 'Skipping %s: %s is missing', + import_def['import_rows'], + import_def['path'], + ) continue rows = load_csv_file(import_def['path']) logger.info('Loaded CSV with %d lines', len(rows)) @@ -275,7 +307,6 @@ def import_data(obsidian_path: Path, dry_run=True): logger.info('Updated %d files', num_files_updated) del import_def, rows - num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty]) logger.info('dirty files in cache: %d', num_dirty) logger.info( diff --git a/personal_data/csv_import.py b/personal_data/csv_import.py index 0ef6435..cc12d35 100644 --- a/personal_data/csv_import.py +++ b/personal_data/csv_import.py @@ -114,7 +114,7 @@ class PossibleKeys: misc: list[str] -def is_duration_key(k,v): +def is_duration_key(k, v): if isinstance(v, Decimal) and 'duration_seconds' in k: return True if isinstance(v, datetime.timedelta) and 'duration' in k: @@ -125,11 +125,7 @@ def is_duration_key(k,v): def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys: # Select data time_keys = [k for k, v in event_data.items() if isinstance(v, datetime.date)] - duration_keys = [ - k - for k, v in event_data.items() - if is_duration_key(k,v) - ] + duration_keys = [k for k, v in event_data.items() if is_duration_key(k, v)] name_keys = [k for k, v in event_data.items() if isinstance(v, str)] image_keys = [ k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult) diff --git a/personal_data/fetchers/gitea.py b/personal_data/fetchers/gitea.py index 246ac3b..760e301 100644 --- a/personal_data/fetchers/gitea.py +++ b/personal_data/fetchers/gitea.py @@ -1,8 +1,6 @@ import dataclasses -import datetime import logging from collections.abc import Iterator, Mapping -from decimal import Decimal from typing import Any from personal_data.data import DeduplicateMode, Scraper @@ -11,16 +9,19 @@ from .. import secrets logger = logging.getLogger(__name__) + def safe_del(d: dict, *keys: str): for key in keys: if key in d: del d[key] -def to_data_point(p: dict[str,Any]) ->Mapping[str, Any]: + +def to_data_point(p: dict[str, Any]) -> Mapping[str, Any]: p['owner'] = p['owner']['login'] safe_del(p, 'permissions', 'internal_tracker') return p + @dataclasses.dataclass(frozen=True) class Gitea(Scraper): dataset_name = 'gitea_repos' @@ -32,13 +33,16 @@ class Gitea(Scraper): return False def scrape(self) -> Iterator[Mapping[str, Any]]: - response = self.session.get('https://gitfub.space/api/v1/repos/search', params = { - #'uid':21, - 'private': True, - 'sort':'updated', - 'order':'desc', - 'access_token': secrets.gitea_access_token(), - }) + response = self.session.get( + 'https://gitfub.space/api/v1/repos/search', + params={ + #'uid':21, + 'private': True, + 'sort': 'updated', + 'order': 'desc', + 'access_token': secrets.gitea_access_token(), + }, + ) response.raise_for_status() data = response.json() diff --git a/personal_data/fetchers/myanimelist.py b/personal_data/fetchers/myanimelist.py index 486ccf8..465cb9e 100644 --- a/personal_data/fetchers/myanimelist.py +++ b/personal_data/fetchers/myanimelist.py @@ -1,18 +1,17 @@ -import abc -import bs4 +import dataclasses +import json +import logging import re import urllib.parse -import json -import dataclasses -import logging -import secrets -from collections.abc import Iterator, Mapping -from enum import Enum +from collections.abc import Iterator + +import bs4 from personal_data.data import DeduplicateMode, Scraper logger = logging.getLogger(__name__) + @dataclasses.dataclass(frozen=True) class MyAnimeListAnime: series_name_eng: str @@ -21,6 +20,7 @@ class MyAnimeListAnime: series_icon: urllib.parse.ParseResult me_score: int + @dataclasses.dataclass(frozen=True) class MyAnimeListSong: song_name_eng: str @@ -48,25 +48,39 @@ class MyAnimeList(Scraper): for data_item in data_items: yield MyAnimeListAnime( - series_name_eng= data_item.get('anime_title_eng') or data_item.get('anime_title'), - series_name= data_item.get('anime_title') or data_item.get('anime_title_eng'), - series_myanimelist_url= urllib.parse.urlparse(urllib.parse.urljoin(url, data_item['anime_url'])), - series_icon= urllib.parse.urlparse(urllib.parse.urljoin(url, data_item['anime_image_path'])), - me_score= data_item.get('score'), + series_name_eng=data_item.get('anime_title_eng') + or data_item.get('anime_title'), + series_name=data_item.get('anime_title') + or data_item.get('anime_title_eng'), + series_myanimelist_url=urllib.parse.urlparse( + urllib.parse.urljoin(url, data_item['anime_url']), + ), + series_icon=urllib.parse.urlparse( + urllib.parse.urljoin(url, data_item['anime_image_path']), + ), + me_score=data_item.get('score'), ) del data_item + def parse_name(text: str): match = re.fullmatch(r'^(?:\d+:\s*)?"(.*?)(?:\((.*)\))?"$', text) return match + assert parse_name('"Soundscape"') assert parse_name('"Soundscape (サウンドスケープ)"').group(2) is not None assert parse_name('1: "Soundscape"') assert parse_name('2: "Soundscape (サウンドスケープ)"').group(2) is not None -def parse_songs(tr_elements, song_position: str, series_name_eng: str, series_name: str): + +def parse_songs( + tr_elements, + song_position: str, + series_name_eng: str, + series_name: str, +): print(series_name_eng, len(tr_elements)) for song_tr in tr_elements: artist = song_tr.select_one('.theme-song-artist') @@ -77,26 +91,26 @@ def parse_songs(tr_elements, song_position: str, series_name_eng: str, series_na e.extract() del e - song_artist = artist.get_text().strip().removeprefix('by ') song_name_eng = song_tr.get_text().strip() - m = parse_name(song_name_eng ) + m = parse_name(song_name_eng) song_name_eng = m.group(1).strip() song_name_jp = m.group(2).strip() if m.group(2) else None - song= MyAnimeListSong( - song_name_eng = song_name_eng , - song_name_jp = song_name_jp , - song_artist = song_artist, - song_placement = song_position, - series_name_eng = series_name_eng, - series_name = series_name, + song = MyAnimeListSong( + song_name_eng=song_name_eng, + song_name_jp=song_name_jp, + song_artist=song_artist, + song_placement=song_position, + series_name_eng=series_name_eng, + series_name=series_name, ) print(' ', song_name_eng) yield song + @dataclasses.dataclass(frozen=True) class MyAnimeListSongs(Scraper): dataset_name = 'myanimelist_songs' @@ -113,10 +127,18 @@ class MyAnimeListSongs(Scraper): for script in soup.select('.oped-popup'): script.extract() - yield from parse_songs(soup.select('.theme-songs.opnening table tr'), - 'opening', anime.series_name_eng, anime.series_name) - yield from parse_songs(soup.select('.theme-songs.ending table tr'), - 'ending', anime.series_name_eng, anime.series_name) + yield from parse_songs( + soup.select('.theme-songs.opnening table tr'), + 'opening', + anime.series_name_eng, + anime.series_name, + ) + yield from parse_songs( + soup.select('.theme-songs.ending table tr'), + 'ending', + anime.series_name_eng, + anime.series_name, + ) def scrape(self) -> Iterator[MyAnimeListSong]: for anime in MyAnimeList(self.session).scrape(): diff --git a/personal_data/fetchers/stepmania.py b/personal_data/fetchers/stepmania.py index 159f0e1..e9a568e 100644 --- a/personal_data/fetchers/stepmania.py +++ b/personal_data/fetchers/stepmania.py @@ -1,20 +1,20 @@ import dataclasses import datetime import logging -import datetime -from collections.abc import Iterator, Mapping -from decimal import Decimal -from pathlib import Path -import bs4 import zoneinfo +from collections.abc import Iterator, Mapping +from pathlib import Path + +import bs4 from personal_data.data import DeduplicateMode, Scraper -from .. import secrets - logger = logging.getLogger(__name__) -STATS_FILE_PATH: Path = Path('/home/jmaa/.itgmania/Save/LocalProfiles/00000000/Stats.xml') +STATS_FILE_PATH: Path = Path( + '/home/jmaa/.itgmania/Save/LocalProfiles/00000000/Stats.xml', +) + @dataclasses.dataclass(frozen=True) class Stepmania(Scraper): @@ -33,30 +33,32 @@ class Stepmania(Scraper): # Derp for score in soup.select('SongScores Song HighScoreList HighScore'): song = score.parent.parent.parent - song_path = Path(song ['Dir'].removesuffix('/')) + song_path = Path(song['Dir'].removesuffix('/')) disqualified = score.select_one('Disqualified').get_text().strip() != '0' if disqualified: logger.warning('Ignored disqualified') continue - play_start = datetime.datetime.fromisoformat(score.select_one('DateTime').get_text()) + play_start = datetime.datetime.fromisoformat( + score.select_one('DateTime').get_text(), + ) play_start = play_start.replace(tzinfo=timezone).astimezone(datetime.UTC) play_seconds = float(score.select_one('SurviveSeconds').get_text()) yield { - 'song.name': song_path.stem, - 'song.pack': song_path.parent.stem, - 'song.difficulty': score.parent.parent['Difficulty'], - 'song.grade': score.select_one('Grade').get_text(), - 'play.start': play_start, - 'play.duration': datetime.timedelta(seconds=play_seconds), - 'score.score': float(score.select_one('PercentDP').get_text()), - 'score.w1': int(score.select_one('W1').get_text()), - 'score.w2': int(score.select_one('W2').get_text()), - 'score.w3': int(score.select_one('W3').get_text()), - 'score.w4': int(score.select_one('W4').get_text()), - 'score.w5': int(score.select_one('W5').get_text()), - 'score.miss': int(score.select_one('Miss').get_text()), + 'song.name': song_path.stem, + 'song.pack': song_path.parent.stem, + 'song.difficulty': score.parent.parent['Difficulty'], + 'song.grade': score.select_one('Grade').get_text(), + 'play.start': play_start, + 'play.duration': datetime.timedelta(seconds=play_seconds), + 'score.score': float(score.select_one('PercentDP').get_text()), + 'score.w1': int(score.select_one('W1').get_text()), + 'score.w2': int(score.select_one('W2').get_text()), + 'score.w3': int(score.select_one('W3').get_text()), + 'score.w4': int(score.select_one('W4').get_text()), + 'score.w5': int(score.select_one('W5').get_text()), + 'score.miss': int(score.select_one('Miss').get_text()), } diff --git a/personal_data/secrets.py b/personal_data/secrets.py index 078a255..d099911 100644 --- a/personal_data/secrets.py +++ b/personal_data/secrets.py @@ -10,6 +10,7 @@ CRUNCHYROLL_AUTH = secrets.load('CRUNCHYROLL_AUTH') # FFXIV FFXIV_CHARACTER_ID = secrets.load('FFXIV_CHARACTER_ID') + # Playstation def playstation_psn_id(): return secrets.load_or_fail('PLAYSTATION_PSN_ID') @@ -24,6 +25,7 @@ def pbc_account_address(): def steam_username(): return secrets.load_or_fail('STEAM_USERNAME') + # Gitea def gitea_access_token(): return secrets.load('GITEA_ACCESS_TOKEN') diff --git a/personal_data/util.py b/personal_data/util.py index 9aa3de8..f013280 100644 --- a/personal_data/util.py +++ b/personal_data/util.py @@ -1,10 +1,8 @@ import _csv import csv import dataclasses -import datetime import io import logging -import urllib.parse from collections.abc import Iterable, Mapping from pathlib import Path from typing import Any @@ -91,15 +89,18 @@ def deduplicate_dicts( def dataclass_to_dict(obj) -> dict[str, Any]: d = dataclasses.asdict(obj) - return {k.replace('_','.',1):v for k,v in d.items()} + return {k.replace('_', '.', 1): v for k, v in d.items()} def normalize_dict(d: dict[str, Any] | frozendict[str, Any]) -> frozendict[str, Any]: if not isinstance(d, dict) and not isinstance(d, frozendict): d = dataclass_to_dict(d) assert isinstance(d, dict) or isinstance(d, frozendict), 'Not a dict' - safe_values = [(k, csv_import.csv_str_to_value(csv_import.csv_safe_value(v))) for k, v in d.items() ] - return frozendict( {k:v for k,v in safe_values if v is not None}) + safe_values = [ + (k, csv_import.csv_str_to_value(csv_import.csv_safe_value(v))) + for k, v in d.items() + ] + return frozendict({k: v for k, v in safe_values if v is not None}) def extend_csv_file( diff --git a/scripts/download_simfiles.py b/scripts/download_simfiles.py index 5b1e511..9aa5055 100644 --- a/scripts/download_simfiles.py +++ b/scripts/download_simfiles.py @@ -1,17 +1,16 @@ -import sys -import bs4 -import zipfile -import subprocess -import csv -import requests -from pathlib import Path -import personal_data.csv_import -import personal_data.main import dataclasses import logging +import subprocess +from pathlib import Path + +import bs4 + +import personal_data.csv_import +import personal_data.main logger = logging.getLogger(__name__) + @dataclasses.dataclass class Result: title: str @@ -19,9 +18,8 @@ class Result: levels: str -SESSION = personal_data.main.get_session( - [], - with_cfscrape=False, ignore_cache=False) +SESSION = personal_data.main.get_session([], with_cfscrape=False, ignore_cache=False) + def parse_results(response) -> list[Result]: soup = bs4.BeautifulSoup(response.text, 'lxml') @@ -36,34 +34,40 @@ def parse_results(response) -> list[Result]: continue id = link['href'].removeprefix('viewsimfile.php?simfileid=') levels = cells[1].get_text().strip() - results.append(Result(title , int(id), levels)) + results.append(Result(title, int(id), levels)) return results + def search_for_song(song_data) -> Result | None: - response = SESSION.post('https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php', - data={ - 'songtitle': song_data['song.name_eng'], - 'songartist': song_data['song.artist'], - }) + response = SESSION.post( + 'https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php', + data={ + 'songtitle': song_data['song.name_eng'], + 'songartist': song_data['song.artist'], + }, + ) if results := parse_results(response): return results[0] - response = SESSION.post('https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php', - data={ - 'songtitle': song_data['song.name_eng'], - 'songartist': '', - }) + response = SESSION.post( + 'https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php', + data={ + 'songtitle': song_data['song.name_eng'], + 'songartist': '', + }, + ) if results := parse_results(response): return results[0] logger.warning('No results for %s', song_data['song.name_eng']) return None + def download_song(song_data, output_dir: Path): song_result = search_for_song(song_data) if song_result is None: return - path_zip = output_dir/f'zenius-{song_result.id}-{song_result.title}.zip' + path_zip = output_dir / f'zenius-{song_result.id}-{song_result.title}.zip' if path_zip.exists(): logger.warning('Skipping existing file') return @@ -75,10 +79,11 @@ def download_song(song_data, output_dir: Path): cmd = ['curl', '-L', '--fail', url, '-o', path_zip] subprocess.run(cmd, check=True, capture_output=True) + def main(): csv_path = Path('./output/myanimelist_songs.csv') output_path = Path('./output/songs') - output_path.mkdir(exist_ok=True,parents=True) + output_path.mkdir(exist_ok=True, parents=True) songs = personal_data.csv_import.load_csv_file(csv_path) for song in songs: @@ -88,5 +93,3 @@ def main(): if __name__ == '__main__': main() - -