1
0
This commit is contained in:
Jon Michael Aanes 2025-03-15 21:54:56 +01:00
parent a0e8d1ec28
commit 3d9c694fe8
10 changed files with 177 additions and 113 deletions

1
.gitignore vendored
View File

@ -18,3 +18,4 @@ __pycache__/
/.coverage
/.hypothesis/
/htmlcov/
.aider*

View File

@ -35,7 +35,9 @@ def fmt_line(label: Label, total_time: datetime.timedelta) -> str:
label_str = str(label.label)
return f' {label.category:20} {label_str:50} {hours:-4d}h {minutes:-2d}m'
LINE_LENGTH = len(fmt_line(Label('',''), datetime.timedelta()))
LINE_LENGTH = len(fmt_line(Label('', ''), datetime.timedelta()))
def generate_report(
samples: list[RealizedActivitySample],

View File

@ -31,6 +31,7 @@ HOUR = datetime.timedelta(hours=1)
MINUTE = datetime.timedelta(minutes=1)
SECOND = datetime.timedelta(seconds=1)
def to_text_duration(duration: datetime.timedelta) -> str:
hours = int(duration / HOUR)
duration -= hours * HOUR
@ -129,6 +130,7 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
return num_updated
def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int:
num_updated = 0
@ -141,22 +143,32 @@ def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int:
COLUMNS = ['score.w1', 'score.w2', 'score.w3', 'score.w4', 'score.w5']
def all_steps(row: dict[str,int]):
def all_steps(row: dict[str, int]):
return sum(row[column] for column in COLUMNS)
steps_per_date = {
date: sum(all_steps(row) for row in rows) for date, rows in rows_per_date.items()
date: sum(all_steps(row) for row in rows)
for date, rows in rows_per_date.items()
}
duration_per_date = {
date: sum((row['play.duration'] for row in rows), start=datetime.timedelta()) for date, rows in rows_per_date.items()
date: sum((row['play.duration'] for row in rows), start=datetime.timedelta())
for date, rows in rows_per_date.items()
}
print(steps_per_date)
print(duration_per_date)
for date in steps_per_date:
was_updated_1 = vault.add_statistic(date, 'Stepmania (Steps)', int(steps_per_date[date]))
was_updated_2 = vault.add_statistic(date, 'Stepmania (Duration)', to_text_duration(duration_per_date[date]))
was_updated_1 = vault.add_statistic(
date,
'Stepmania (Steps)',
int(steps_per_date[date]),
)
was_updated_2 = vault.add_statistic(
date,
'Stepmania (Duration)',
to_text_duration(duration_per_date[date]),
)
if was_updated_1 or was_updated_2:
num_updated += 1
del date, was_updated_1, was_updated_2
@ -247,27 +259,47 @@ def map_games_played_content(sample: RealizedActivitySample) -> EventContent:
PATH_WATCHED = Path('output/show_episodes_watched.csv')
PATH_PLAYED = Path('output/games_played.csv')
PATH_WORKOUT = Path('/home/jmaa/Notes/workout.csv')
PATH_WORKOUT = Path('/home/jmaa/Notes/workout.csv')
PATH_STEP_COUNTS = Path(
'/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv',
)
PATH_STEPMANIA = Path('output/stepmania.csv')
PATH_STEPMANIA = Path('output/stepmania.csv')
IMPORTERS = [
{'path': PATH_WORKOUT, 'import_rows': import_workout_csv},
{'path': PATH_STEP_COUNTS, 'import_rows': import_step_counts_csv},
{'path': PATH_STEPMANIA, 'import_rows': import_stepmania_steps_csv},
{'path': PATH_PLAYED, 'import_rows': lambda vault, rows: import_activity_sample_csv(vault, rows, map_games_played_content, group_category='game.name',) },
{'path': PATH_WATCHED, 'import_rows': lambda vault, rows: import_activity_sample_csv(vault, rows, map_watched_series_content) },
{'path': PATH_WORKOUT, 'import_rows': import_workout_csv},
{'path': PATH_STEP_COUNTS, 'import_rows': import_step_counts_csv},
{'path': PATH_STEPMANIA, 'import_rows': import_stepmania_steps_csv},
{
'path': PATH_PLAYED,
'import_rows': lambda vault, rows: import_activity_sample_csv(
vault,
rows,
map_games_played_content,
group_category='game.name',
),
},
{
'path': PATH_WATCHED,
'import_rows': lambda vault, rows: import_activity_sample_csv(
vault,
rows,
map_watched_series_content,
),
},
]
def import_data(obsidian_path: Path, dry_run=True):
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
for import_def in IMPORTERS:
if not import_def['path'].exists():
logger.warning('Skipping %s: %s is missing', import_def['import_rows'], import_def['path'])
logger.warning(
'Skipping %s: %s is missing',
import_def['import_rows'],
import_def['path'],
)
continue
rows = load_csv_file(import_def['path'])
logger.info('Loaded CSV with %d lines', len(rows))
@ -275,7 +307,6 @@ def import_data(obsidian_path: Path, dry_run=True):
logger.info('Updated %d files', num_files_updated)
del import_def, rows
num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty])
logger.info('dirty files in cache: %d', num_dirty)
logger.info(

View File

@ -114,7 +114,7 @@ class PossibleKeys:
misc: list[str]
def is_duration_key(k,v):
def is_duration_key(k, v):
if isinstance(v, Decimal) and 'duration_seconds' in k:
return True
if isinstance(v, datetime.timedelta) and 'duration' in k:
@ -125,11 +125,7 @@ def is_duration_key(k,v):
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
# Select data
time_keys = [k for k, v in event_data.items() if isinstance(v, datetime.date)]
duration_keys = [
k
for k, v in event_data.items()
if is_duration_key(k,v)
]
duration_keys = [k for k, v in event_data.items() if is_duration_key(k, v)]
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)

View File

@ -1,8 +1,6 @@
import dataclasses
import datetime
import logging
from collections.abc import Iterator, Mapping
from decimal import Decimal
from typing import Any
from personal_data.data import DeduplicateMode, Scraper
@ -11,16 +9,19 @@ from .. import secrets
logger = logging.getLogger(__name__)
def safe_del(d: dict, *keys: str):
for key in keys:
if key in d:
del d[key]
def to_data_point(p: dict[str,Any]) ->Mapping[str, Any]:
def to_data_point(p: dict[str, Any]) -> Mapping[str, Any]:
p['owner'] = p['owner']['login']
safe_del(p, 'permissions', 'internal_tracker')
return p
@dataclasses.dataclass(frozen=True)
class Gitea(Scraper):
dataset_name = 'gitea_repos'
@ -32,13 +33,16 @@ class Gitea(Scraper):
return False
def scrape(self) -> Iterator[Mapping[str, Any]]:
response = self.session.get('https://gitfub.space/api/v1/repos/search', params = {
#'uid':21,
'private': True,
'sort':'updated',
'order':'desc',
'access_token': secrets.gitea_access_token(),
})
response = self.session.get(
'https://gitfub.space/api/v1/repos/search',
params={
#'uid':21,
'private': True,
'sort': 'updated',
'order': 'desc',
'access_token': secrets.gitea_access_token(),
},
)
response.raise_for_status()
data = response.json()

View File

@ -1,18 +1,17 @@
import abc
import bs4
import dataclasses
import json
import logging
import re
import urllib.parse
import json
import dataclasses
import logging
import secrets
from collections.abc import Iterator, Mapping
from enum import Enum
from collections.abc import Iterator
import bs4
from personal_data.data import DeduplicateMode, Scraper
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class MyAnimeListAnime:
series_name_eng: str
@ -21,6 +20,7 @@ class MyAnimeListAnime:
series_icon: urllib.parse.ParseResult
me_score: int
@dataclasses.dataclass(frozen=True)
class MyAnimeListSong:
song_name_eng: str
@ -48,25 +48,39 @@ class MyAnimeList(Scraper):
for data_item in data_items:
yield MyAnimeListAnime(
series_name_eng= data_item.get('anime_title_eng') or data_item.get('anime_title'),
series_name= data_item.get('anime_title') or data_item.get('anime_title_eng'),
series_myanimelist_url= urllib.parse.urlparse(urllib.parse.urljoin(url, data_item['anime_url'])),
series_icon= urllib.parse.urlparse(urllib.parse.urljoin(url, data_item['anime_image_path'])),
me_score= data_item.get('score'),
series_name_eng=data_item.get('anime_title_eng')
or data_item.get('anime_title'),
series_name=data_item.get('anime_title')
or data_item.get('anime_title_eng'),
series_myanimelist_url=urllib.parse.urlparse(
urllib.parse.urljoin(url, data_item['anime_url']),
),
series_icon=urllib.parse.urlparse(
urllib.parse.urljoin(url, data_item['anime_image_path']),
),
me_score=data_item.get('score'),
)
del data_item
def parse_name(text: str):
match = re.fullmatch(r'^(?:\d+:\s*)?"(.*?)(?:\((.*)\))?"$', text)
return match
assert parse_name('"Soundscape"')
assert parse_name('"Soundscape (サウンドスケープ)"').group(2) is not None
assert parse_name('1: "Soundscape"')
assert parse_name('2: "Soundscape (サウンドスケープ)"').group(2) is not None
def parse_songs(tr_elements, song_position: str, series_name_eng: str, series_name: str):
def parse_songs(
tr_elements,
song_position: str,
series_name_eng: str,
series_name: str,
):
print(series_name_eng, len(tr_elements))
for song_tr in tr_elements:
artist = song_tr.select_one('.theme-song-artist')
@ -77,26 +91,26 @@ def parse_songs(tr_elements, song_position: str, series_name_eng: str, series_na
e.extract()
del e
song_artist = artist.get_text().strip().removeprefix('by ')
song_name_eng = song_tr.get_text().strip()
m = parse_name(song_name_eng )
m = parse_name(song_name_eng)
song_name_eng = m.group(1).strip()
song_name_jp = m.group(2).strip() if m.group(2) else None
song= MyAnimeListSong(
song_name_eng = song_name_eng ,
song_name_jp = song_name_jp ,
song_artist = song_artist,
song_placement = song_position,
series_name_eng = series_name_eng,
series_name = series_name,
song = MyAnimeListSong(
song_name_eng=song_name_eng,
song_name_jp=song_name_jp,
song_artist=song_artist,
song_placement=song_position,
series_name_eng=series_name_eng,
series_name=series_name,
)
print(' ', song_name_eng)
yield song
@dataclasses.dataclass(frozen=True)
class MyAnimeListSongs(Scraper):
dataset_name = 'myanimelist_songs'
@ -113,10 +127,18 @@ class MyAnimeListSongs(Scraper):
for script in soup.select('.oped-popup'):
script.extract()
yield from parse_songs(soup.select('.theme-songs.opnening table tr'),
'opening', anime.series_name_eng, anime.series_name)
yield from parse_songs(soup.select('.theme-songs.ending table tr'),
'ending', anime.series_name_eng, anime.series_name)
yield from parse_songs(
soup.select('.theme-songs.opnening table tr'),
'opening',
anime.series_name_eng,
anime.series_name,
)
yield from parse_songs(
soup.select('.theme-songs.ending table tr'),
'ending',
anime.series_name_eng,
anime.series_name,
)
def scrape(self) -> Iterator[MyAnimeListSong]:
for anime in MyAnimeList(self.session).scrape():

View File

@ -1,20 +1,20 @@
import dataclasses
import datetime
import logging
import datetime
from collections.abc import Iterator, Mapping
from decimal import Decimal
from pathlib import Path
import bs4
import zoneinfo
from collections.abc import Iterator, Mapping
from pathlib import Path
import bs4
from personal_data.data import DeduplicateMode, Scraper
from .. import secrets
logger = logging.getLogger(__name__)
STATS_FILE_PATH: Path = Path('/home/jmaa/.itgmania/Save/LocalProfiles/00000000/Stats.xml')
STATS_FILE_PATH: Path = Path(
'/home/jmaa/.itgmania/Save/LocalProfiles/00000000/Stats.xml',
)
@dataclasses.dataclass(frozen=True)
class Stepmania(Scraper):
@ -33,30 +33,32 @@ class Stepmania(Scraper):
# Derp
for score in soup.select('SongScores Song HighScoreList HighScore'):
song = score.parent.parent.parent
song_path = Path(song ['Dir'].removesuffix('/'))
song_path = Path(song['Dir'].removesuffix('/'))
disqualified = score.select_one('Disqualified').get_text().strip() != '0'
if disqualified:
logger.warning('Ignored disqualified')
continue
play_start = datetime.datetime.fromisoformat(score.select_one('DateTime').get_text())
play_start = datetime.datetime.fromisoformat(
score.select_one('DateTime').get_text(),
)
play_start = play_start.replace(tzinfo=timezone).astimezone(datetime.UTC)
play_seconds = float(score.select_one('SurviveSeconds').get_text())
yield {
'song.name': song_path.stem,
'song.pack': song_path.parent.stem,
'song.difficulty': score.parent.parent['Difficulty'],
'song.grade': score.select_one('Grade').get_text(),
'play.start': play_start,
'play.duration': datetime.timedelta(seconds=play_seconds),
'score.score': float(score.select_one('PercentDP').get_text()),
'score.w1': int(score.select_one('W1').get_text()),
'score.w2': int(score.select_one('W2').get_text()),
'score.w3': int(score.select_one('W3').get_text()),
'score.w4': int(score.select_one('W4').get_text()),
'score.w5': int(score.select_one('W5').get_text()),
'score.miss': int(score.select_one('Miss').get_text()),
'song.name': song_path.stem,
'song.pack': song_path.parent.stem,
'song.difficulty': score.parent.parent['Difficulty'],
'song.grade': score.select_one('Grade').get_text(),
'play.start': play_start,
'play.duration': datetime.timedelta(seconds=play_seconds),
'score.score': float(score.select_one('PercentDP').get_text()),
'score.w1': int(score.select_one('W1').get_text()),
'score.w2': int(score.select_one('W2').get_text()),
'score.w3': int(score.select_one('W3').get_text()),
'score.w4': int(score.select_one('W4').get_text()),
'score.w5': int(score.select_one('W5').get_text()),
'score.miss': int(score.select_one('Miss').get_text()),
}

View File

@ -10,6 +10,7 @@ CRUNCHYROLL_AUTH = secrets.load('CRUNCHYROLL_AUTH')
# FFXIV
FFXIV_CHARACTER_ID = secrets.load('FFXIV_CHARACTER_ID')
# Playstation
def playstation_psn_id():
return secrets.load_or_fail('PLAYSTATION_PSN_ID')
@ -24,6 +25,7 @@ def pbc_account_address():
def steam_username():
return secrets.load_or_fail('STEAM_USERNAME')
# Gitea
def gitea_access_token():
return secrets.load('GITEA_ACCESS_TOKEN')

View File

@ -1,10 +1,8 @@
import _csv
import csv
import dataclasses
import datetime
import io
import logging
import urllib.parse
from collections.abc import Iterable, Mapping
from pathlib import Path
from typing import Any
@ -91,15 +89,18 @@ def deduplicate_dicts(
def dataclass_to_dict(obj) -> dict[str, Any]:
d = dataclasses.asdict(obj)
return {k.replace('_','.',1):v for k,v in d.items()}
return {k.replace('_', '.', 1): v for k, v in d.items()}
def normalize_dict(d: dict[str, Any] | frozendict[str, Any]) -> frozendict[str, Any]:
if not isinstance(d, dict) and not isinstance(d, frozendict):
d = dataclass_to_dict(d)
assert isinstance(d, dict) or isinstance(d, frozendict), 'Not a dict'
safe_values = [(k, csv_import.csv_str_to_value(csv_import.csv_safe_value(v))) for k, v in d.items() ]
return frozendict( {k:v for k,v in safe_values if v is not None})
safe_values = [
(k, csv_import.csv_str_to_value(csv_import.csv_safe_value(v)))
for k, v in d.items()
]
return frozendict({k: v for k, v in safe_values if v is not None})
def extend_csv_file(

View File

@ -1,17 +1,16 @@
import sys
import bs4
import zipfile
import subprocess
import csv
import requests
from pathlib import Path
import personal_data.csv_import
import personal_data.main
import dataclasses
import logging
import subprocess
from pathlib import Path
import bs4
import personal_data.csv_import
import personal_data.main
logger = logging.getLogger(__name__)
@dataclasses.dataclass
class Result:
title: str
@ -19,9 +18,8 @@ class Result:
levels: str
SESSION = personal_data.main.get_session(
[],
with_cfscrape=False, ignore_cache=False)
SESSION = personal_data.main.get_session([], with_cfscrape=False, ignore_cache=False)
def parse_results(response) -> list[Result]:
soup = bs4.BeautifulSoup(response.text, 'lxml')
@ -36,34 +34,40 @@ def parse_results(response) -> list[Result]:
continue
id = link['href'].removeprefix('viewsimfile.php?simfileid=')
levels = cells[1].get_text().strip()
results.append(Result(title , int(id), levels))
results.append(Result(title, int(id), levels))
return results
def search_for_song(song_data) -> Result | None:
response = SESSION.post('https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php',
data={
'songtitle': song_data['song.name_eng'],
'songartist': song_data['song.artist'],
})
response = SESSION.post(
'https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php',
data={
'songtitle': song_data['song.name_eng'],
'songartist': song_data['song.artist'],
},
)
if results := parse_results(response):
return results[0]
response = SESSION.post('https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php',
data={
'songtitle': song_data['song.name_eng'],
'songartist': '',
})
response = SESSION.post(
'https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php',
data={
'songtitle': song_data['song.name_eng'],
'songartist': '',
},
)
if results := parse_results(response):
return results[0]
logger.warning('No results for %s', song_data['song.name_eng'])
return None
def download_song(song_data, output_dir: Path):
song_result = search_for_song(song_data)
if song_result is None:
return
path_zip = output_dir/f'zenius-{song_result.id}-{song_result.title}.zip'
path_zip = output_dir / f'zenius-{song_result.id}-{song_result.title}.zip'
if path_zip.exists():
logger.warning('Skipping existing file')
return
@ -75,10 +79,11 @@ def download_song(song_data, output_dir: Path):
cmd = ['curl', '-L', '--fail', url, '-o', path_zip]
subprocess.run(cmd, check=True, capture_output=True)
def main():
csv_path = Path('./output/myanimelist_songs.csv')
output_path = Path('./output/songs')
output_path.mkdir(exist_ok=True,parents=True)
output_path.mkdir(exist_ok=True, parents=True)
songs = personal_data.csv_import.load_csv_file(csv_path)
for song in songs:
@ -88,5 +93,3 @@ def main():
if __name__ == '__main__':
main()