1
0

Compare commits

..

No commits in common. "9058279b4e2d8398a150624f25a212375fff0315" and "4dfbde77ecddfc6f4a1b15072ad60013b947d461" have entirely different histories.

11 changed files with 114 additions and 221 deletions

1
.gitignore vendored
View File

@ -18,4 +18,3 @@ __pycache__/
/.coverage
/.hypothesis/
/htmlcov/
.aider*

View File

@ -35,9 +35,7 @@ def fmt_line(label: Label, total_time: datetime.timedelta) -> str:
label_str = str(label.label)
return f' {label.category:20} {label_str:50} {hours:-4d}h {minutes:-2d}m'
LINE_LENGTH = len(fmt_line(Label('', ''), datetime.timedelta()))
LINE_LENGTH = len(fmt_line(Label('',''), datetime.timedelta()))
def generate_report(
samples: list[RealizedActivitySample],

View File

@ -31,7 +31,6 @@ HOUR = datetime.timedelta(hours=1)
MINUTE = datetime.timedelta(minutes=1)
SECOND = datetime.timedelta(seconds=1)
def to_text_duration(duration: datetime.timedelta) -> str:
hours = int(duration / HOUR)
duration -= hours * HOUR
@ -130,7 +129,6 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
return num_updated
def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int:
num_updated = 0
@ -143,32 +141,22 @@ def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int:
COLUMNS = ['score.w1', 'score.w2', 'score.w3', 'score.w4', 'score.w5']
def all_steps(row: dict[str, int]):
def all_steps(row: dict[str,int]):
return sum(row[column] for column in COLUMNS)
steps_per_date = {
date: sum(all_steps(row) for row in rows)
for date, rows in rows_per_date.items()
date: sum(all_steps(row) for row in rows) for date, rows in rows_per_date.items()
}
duration_per_date = {
date: sum((row['play.duration'] for row in rows), start=datetime.timedelta())
for date, rows in rows_per_date.items()
date: sum((row['play.duration'] for row in rows), start=datetime.timedelta()) for date, rows in rows_per_date.items()
}
print(steps_per_date)
print(duration_per_date)
for date in steps_per_date:
was_updated_1 = vault.add_statistic(
date,
'Stepmania (Steps)',
int(steps_per_date[date]),
)
was_updated_2 = vault.add_statistic(
date,
'Stepmania (Duration)',
to_text_duration(duration_per_date[date]),
)
was_updated_1 = vault.add_statistic(date, 'Stepmania (Steps)', int(steps_per_date[date]))
was_updated_2 = vault.add_statistic(date, 'Stepmania (Duration)', to_text_duration(duration_per_date[date]))
if was_updated_1 or was_updated_2:
num_updated += 1
del date, was_updated_1, was_updated_2
@ -259,47 +247,27 @@ def map_games_played_content(sample: RealizedActivitySample) -> EventContent:
PATH_WATCHED = Path('output/show_episodes_watched.csv')
PATH_PLAYED = Path('output/games_played.csv')
PATH_WORKOUT = Path('/home/jmaa/Notes/workout.csv')
PATH_WORKOUT = Path('/home/jmaa/Notes/workout.csv')
PATH_STEP_COUNTS = Path(
'/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv',
)
PATH_STEPMANIA = Path('output/stepmania.csv')
PATH_STEPMANIA = Path('output/stepmania.csv')
IMPORTERS = [
{'path': PATH_WORKOUT, 'import_rows': import_workout_csv},
{'path': PATH_STEP_COUNTS, 'import_rows': import_step_counts_csv},
{'path': PATH_STEPMANIA, 'import_rows': import_stepmania_steps_csv},
{
'path': PATH_PLAYED,
'import_rows': lambda vault, rows: import_activity_sample_csv(
vault,
rows,
map_games_played_content,
group_category='game.name',
),
},
{
'path': PATH_WATCHED,
'import_rows': lambda vault, rows: import_activity_sample_csv(
vault,
rows,
map_watched_series_content,
),
},
{'path': PATH_WORKOUT, 'import_rows': import_workout_csv},
{'path': PATH_STEP_COUNTS, 'import_rows': import_step_counts_csv},
{'path': PATH_STEPMANIA, 'import_rows': import_stepmania_steps_csv},
{'path': PATH_PLAYED, 'import_rows': lambda vault, rows: import_activity_sample_csv(vault, rows, map_games_played_content, group_category='game.name',) },
{'path': PATH_WATCHED, 'import_rows': lambda vault, rows: import_activity_sample_csv(vault, rows, map_watched_series_content) },
]
def import_data(obsidian_path: Path, dry_run=True):
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
for import_def in IMPORTERS:
if not import_def['path'].exists():
logger.warning(
'Skipping %s: %s is missing',
import_def['import_rows'],
import_def['path'],
)
logger.warning('Skipping %s: %s is missing', import_def['import_rows'], import_def['path'])
continue
rows = load_csv_file(import_def['path'])
logger.info('Loaded CSV with %d lines', len(rows))
@ -307,6 +275,7 @@ def import_data(obsidian_path: Path, dry_run=True):
logger.info('Updated %d files', num_files_updated)
del import_def, rows
num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty])
logger.info('dirty files in cache: %d', num_dirty)
logger.info(

View File

@ -114,7 +114,7 @@ class PossibleKeys:
misc: list[str]
def is_duration_key(k, v):
def is_duration_key(k,v):
if isinstance(v, Decimal) and 'duration_seconds' in k:
return True
if isinstance(v, datetime.timedelta) and 'duration' in k:
@ -125,7 +125,11 @@ def is_duration_key(k, v):
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
# Select data
time_keys = [k for k, v in event_data.items() if isinstance(v, datetime.date)]
duration_keys = [k for k, v in event_data.items() if is_duration_key(k, v)]
duration_keys = [
k
for k, v in event_data.items()
if is_duration_key(k,v)
]
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)

View File

@ -1,22 +1,26 @@
import dataclasses
import datetime
import logging
from collections.abc import Iterator, Mapping
from decimal import Decimal
from typing import Any
from personal_data.data import DeduplicateMode, Scraper
from ..util import safe_del
from .. import secrets
logger = logging.getLogger(__name__)
def safe_del(d: dict, *keys: str):
for key in keys:
if key in d:
del d[key]
def to_data_point(p: dict[str, Any]) -> Mapping[str, Any]:
def to_data_point(p: dict[str,Any]) ->Mapping[str, Any]:
p['owner'] = p['owner']['login']
safe_del(p, 'permissions', 'internal_tracker')
return p
@dataclasses.dataclass(frozen=True)
class Gitea(Scraper):
dataset_name = 'gitea_repos'
@ -28,16 +32,13 @@ class Gitea(Scraper):
return False
def scrape(self) -> Iterator[Mapping[str, Any]]:
response = self.session.get(
'https://gitfub.space/api/v1/repos/search',
params={
#'uid':21,
'private': True,
'sort': 'updated',
'order': 'desc',
'access_token': secrets.gitea_access_token(),
},
)
response = self.session.get('https://gitfub.space/api/v1/repos/search', params = {
#'uid':21,
'private': True,
'sort':'updated',
'order':'desc',
'access_token': secrets.gitea_access_token(),
})
response.raise_for_status()
data = response.json()

View File

@ -1,17 +1,18 @@
import dataclasses
import json
import logging
import abc
import bs4
import re
import urllib.parse
from collections.abc import Iterator
import bs4
import json
import dataclasses
import logging
import secrets
from collections.abc import Iterator, Mapping
from enum import Enum
from personal_data.data import DeduplicateMode, Scraper
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class MyAnimeListAnime:
series_name_eng: str
@ -20,7 +21,6 @@ class MyAnimeListAnime:
series_icon: urllib.parse.ParseResult
me_score: int
@dataclasses.dataclass(frozen=True)
class MyAnimeListSong:
song_name_eng: str
@ -48,39 +48,25 @@ class MyAnimeList(Scraper):
for data_item in data_items:
yield MyAnimeListAnime(
series_name_eng=data_item.get('anime_title_eng')
or data_item.get('anime_title'),
series_name=data_item.get('anime_title')
or data_item.get('anime_title_eng'),
series_myanimelist_url=urllib.parse.urlparse(
urllib.parse.urljoin(url, data_item['anime_url']),
),
series_icon=urllib.parse.urlparse(
urllib.parse.urljoin(url, data_item['anime_image_path']),
),
me_score=data_item.get('score'),
series_name_eng= data_item.get('anime_title_eng') or data_item.get('anime_title'),
series_name= data_item.get('anime_title') or data_item.get('anime_title_eng'),
series_myanimelist_url= urllib.parse.urlparse(urllib.parse.urljoin(url, data_item['anime_url'])),
series_icon= urllib.parse.urlparse(urllib.parse.urljoin(url, data_item['anime_image_path'])),
me_score= data_item.get('score'),
)
del data_item
def parse_name(text: str):
match = re.fullmatch(r'^(?:\d+:\s*)?"(.*?)(?:\((.*)\))?"$', text)
return match
assert parse_name('"Soundscape"')
assert parse_name('"Soundscape (サウンドスケープ)"').group(2) is not None
assert parse_name('1: "Soundscape"')
assert parse_name('2: "Soundscape (サウンドスケープ)"').group(2) is not None
def parse_songs(
tr_elements,
song_position: str,
series_name_eng: str,
series_name: str,
):
def parse_songs(tr_elements, song_position: str, series_name_eng: str, series_name: str):
print(series_name_eng, len(tr_elements))
for song_tr in tr_elements:
artist = song_tr.select_one('.theme-song-artist')
@ -91,26 +77,26 @@ def parse_songs(
e.extract()
del e
song_artist = artist.get_text().strip().removeprefix('by ')
song_name_eng = song_tr.get_text().strip()
m = parse_name(song_name_eng)
m = parse_name(song_name_eng )
song_name_eng = m.group(1).strip()
song_name_jp = m.group(2).strip() if m.group(2) else None
song = MyAnimeListSong(
song_name_eng=song_name_eng,
song_name_jp=song_name_jp,
song_artist=song_artist,
song_placement=song_position,
series_name_eng=series_name_eng,
series_name=series_name,
song= MyAnimeListSong(
song_name_eng = song_name_eng ,
song_name_jp = song_name_jp ,
song_artist = song_artist,
song_placement = song_position,
series_name_eng = series_name_eng,
series_name = series_name,
)
print(' ', song_name_eng)
yield song
@dataclasses.dataclass(frozen=True)
class MyAnimeListSongs(Scraper):
dataset_name = 'myanimelist_songs'
@ -127,18 +113,10 @@ class MyAnimeListSongs(Scraper):
for script in soup.select('.oped-popup'):
script.extract()
yield from parse_songs(
soup.select('.theme-songs.opnening table tr'),
'opening',
anime.series_name_eng,
anime.series_name,
)
yield from parse_songs(
soup.select('.theme-songs.ending table tr'),
'ending',
anime.series_name_eng,
anime.series_name,
)
yield from parse_songs(soup.select('.theme-songs.opnening table tr'),
'opening', anime.series_name_eng, anime.series_name)
yield from parse_songs(soup.select('.theme-songs.ending table tr'),
'ending', anime.series_name_eng, anime.series_name)
def scrape(self) -> Iterator[MyAnimeListSong]:
for anime in MyAnimeList(self.session).scrape():

View File

@ -1,20 +1,20 @@
import dataclasses
import datetime
import logging
import zoneinfo
import datetime
from collections.abc import Iterator, Mapping
from decimal import Decimal
from pathlib import Path
import bs4
import zoneinfo
from personal_data.data import DeduplicateMode, Scraper
from .. import secrets
logger = logging.getLogger(__name__)
STATS_FILE_PATH: Path = Path(
'/home/jmaa/.itgmania/Save/LocalProfiles/00000000/Stats.xml',
)
STATS_FILE_PATH: Path = Path('/home/jmaa/.itgmania/Save/LocalProfiles/00000000/Stats.xml')
@dataclasses.dataclass(frozen=True)
class Stepmania(Scraper):
@ -33,32 +33,30 @@ class Stepmania(Scraper):
# Derp
for score in soup.select('SongScores Song HighScoreList HighScore'):
song = score.parent.parent.parent
song_path = Path(song['Dir'].removesuffix('/'))
song_path = Path(song ['Dir'].removesuffix('/'))
disqualified = score.select_one('Disqualified').get_text().strip() != '0'
if disqualified:
logger.warning('Ignored disqualified')
continue
play_start = datetime.datetime.fromisoformat(
score.select_one('DateTime').get_text(),
)
play_start = datetime.datetime.fromisoformat(score.select_one('DateTime').get_text())
play_start = play_start.replace(tzinfo=timezone).astimezone(datetime.UTC)
play_seconds = float(score.select_one('SurviveSeconds').get_text())
yield {
'song.name': song_path.stem,
'song.pack': song_path.parent.stem,
'song.difficulty': score.parent.parent['Difficulty'],
'song.grade': score.select_one('Grade').get_text(),
'play.start': play_start,
'play.duration': datetime.timedelta(seconds=play_seconds),
'score.score': float(score.select_one('PercentDP').get_text()),
'score.w1': int(score.select_one('W1').get_text()),
'score.w2': int(score.select_one('W2').get_text()),
'score.w3': int(score.select_one('W3').get_text()),
'score.w4': int(score.select_one('W4').get_text()),
'score.w5': int(score.select_one('W5').get_text()),
'score.miss': int(score.select_one('Miss').get_text()),
'song.name': song_path.stem,
'song.pack': song_path.parent.stem,
'song.difficulty': score.parent.parent['Difficulty'],
'song.grade': score.select_one('Grade').get_text(),
'play.start': play_start,
'play.duration': datetime.timedelta(seconds=play_seconds),
'score.score': float(score.select_one('PercentDP').get_text()),
'score.w1': int(score.select_one('W1').get_text()),
'score.w2': int(score.select_one('W2').get_text()),
'score.w3': int(score.select_one('W3').get_text()),
'score.w4': int(score.select_one('W4').get_text()),
'score.w5': int(score.select_one('W5').get_text()),
'score.miss': int(score.select_one('Miss').get_text()),
}

View File

@ -1,41 +0,0 @@
import csv
import json
import logging
import subprocess
from dataclasses import dataclass
from personal_data.data import DeduplicateMode, Scraper
from ..util import safe_del
logger = logging.getLogger(__name__)
PLAYLIST_ID='PLAfDVJvDKCvOMvfoTL7eW8GkWNJwd90eV'
#PLAYLIST_ID='LL'
@dataclass(frozen=True)
class YoutubeFavoritesScraper(Scraper):
dataset_name: str = 'youtube_favorites'
deduplicate_mode: DeduplicateMode = DeduplicateMode.BY_ALL_COLUMNS
deduplicate_ignore_columns = []
def scrape(self) -> list[dict]:
"""Use yt-dlp to fetch the list of favorited videos. This is a placeholder for invoking yt-dlp and parsing its output."""
result = subprocess.run(
[
'yt-dlp',
'--flat-playlist',
'--dump-json',
f'https://www.youtube.com/playlist?list={PLAYLIST_ID}',
],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(f'Non-zero returncode in command: {result.returncode}\n\n{result.stderr}')
for line in result.stdout.splitlines():
data = json.loads(line)
data['thumbnail'] = data['thumbnails'][-1]['url']
safe_del(data, '_type', '_version', 'thumbnails')
yield data

View File

@ -10,7 +10,6 @@ CRUNCHYROLL_AUTH = secrets.load('CRUNCHYROLL_AUTH')
# FFXIV
FFXIV_CHARACTER_ID = secrets.load('FFXIV_CHARACTER_ID')
# Playstation
def playstation_psn_id():
return secrets.load_or_fail('PLAYSTATION_PSN_ID')
@ -25,7 +24,6 @@ def pbc_account_address():
def steam_username():
return secrets.load_or_fail('STEAM_USERNAME')
# Gitea
def gitea_access_token():
return secrets.load('GITEA_ACCESS_TOKEN')

View File

@ -1,8 +1,10 @@
import _csv
import csv
import dataclasses
import datetime
import io
import logging
import urllib.parse
from collections.abc import Iterable, Mapping
from pathlib import Path
from typing import Any
@ -14,13 +16,6 @@ from . import csv_import, data
logger = logging.getLogger(__name__)
def safe_del(d: dict, *keys: str):
for key in keys:
if key in d:
del d[key]
def equals_without_fields(
a: Mapping[str, Any],
b: Mapping[str, Any],
@ -96,18 +91,15 @@ def deduplicate_dicts(
def dataclass_to_dict(obj) -> dict[str, Any]:
d = dataclasses.asdict(obj)
return {k.replace('_', '.', 1): v for k, v in d.items()}
return {k.replace('_','.',1):v for k,v in d.items()}
def normalize_dict(d: dict[str, Any] | frozendict[str, Any]) -> frozendict[str, Any]:
if not isinstance(d, dict) and not isinstance(d, frozendict):
d = dataclass_to_dict(d)
assert isinstance(d, dict) or isinstance(d, frozendict), 'Not a dict'
safe_values = [
(k, csv_import.csv_str_to_value(csv_import.csv_safe_value(v)))
for k, v in d.items()
]
return frozendict({k: v for k, v in safe_values if v is not None})
safe_values = [(k, csv_import.csv_str_to_value(csv_import.csv_safe_value(v))) for k, v in d.items() ]
return frozendict( {k:v for k,v in safe_values if v is not None})
def extend_csv_file(

View File

@ -1,16 +1,17 @@
import dataclasses
import logging
import subprocess
from pathlib import Path
import sys
import bs4
import zipfile
import subprocess
import csv
import requests
from pathlib import Path
import personal_data.csv_import
import personal_data.main
import dataclasses
import logging
logger = logging.getLogger(__name__)
@dataclasses.dataclass
class Result:
title: str
@ -18,8 +19,9 @@ class Result:
levels: str
SESSION = personal_data.main.get_session([], with_cfscrape=False, ignore_cache=False)
SESSION = personal_data.main.get_session(
[],
with_cfscrape=False, ignore_cache=False)
def parse_results(response) -> list[Result]:
soup = bs4.BeautifulSoup(response.text, 'lxml')
@ -34,40 +36,34 @@ def parse_results(response) -> list[Result]:
continue
id = link['href'].removeprefix('viewsimfile.php?simfileid=')
levels = cells[1].get_text().strip()
results.append(Result(title, int(id), levels))
results.append(Result(title , int(id), levels))
return results
def search_for_song(song_data) -> Result | None:
response = SESSION.post(
'https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php',
data={
'songtitle': song_data['song.name_eng'],
'songartist': song_data['song.artist'],
},
)
response = SESSION.post('https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php',
data={
'songtitle': song_data['song.name_eng'],
'songartist': song_data['song.artist'],
})
if results := parse_results(response):
return results[0]
response = SESSION.post(
'https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php',
data={
'songtitle': song_data['song.name_eng'],
'songartist': '',
},
)
response = SESSION.post('https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php',
data={
'songtitle': song_data['song.name_eng'],
'songartist': '',
})
if results := parse_results(response):
return results[0]
logger.warning('No results for %s', song_data['song.name_eng'])
return None
def download_song(song_data, output_dir: Path):
song_result = search_for_song(song_data)
if song_result is None:
return
path_zip = output_dir / f'zenius-{song_result.id}-{song_result.title}.zip'
path_zip = output_dir/f'zenius-{song_result.id}-{song_result.title}.zip'
if path_zip.exists():
logger.warning('Skipping existing file')
return
@ -79,11 +75,10 @@ def download_song(song_data, output_dir: Path):
cmd = ['curl', '-L', '--fail', url, '-o', path_zip]
subprocess.run(cmd, check=True, capture_output=True)
def main():
csv_path = Path('./output/myanimelist_songs.csv')
output_path = Path('./output/songs')
output_path.mkdir(exist_ok=True, parents=True)
output_path.mkdir(exist_ok=True,parents=True)
songs = personal_data.csv_import.load_csv_file(csv_path)
for song in songs:
@ -93,3 +88,5 @@ def main():
if __name__ == '__main__':
main()