1
0

Compare commits

...

6 Commits

Author SHA1 Message Date
9058279b4e YouTube fixed
All checks were successful
Run Python tests (through Pytest) / Test (push) Successful in 36s
Verify Python project can be installed, loaded and have version checked / Test (push) Successful in 32s
2025-03-15 22:18:30 +01:00
3d9c694fe8 Ruff 2025-03-15 21:54:56 +01:00
a0e8d1ec28 style: Format list comprehension for better readability in youtube.py 2025-03-15 21:54:20 +01:00
2a4aec9d33 refactor: Simplify fetch_data and to_csv methods in YoutubeFavoritesScraper 2025-03-15 21:54:19 +01:00
965689df7a style: Run linter and fix code formatting in youtube.py 2025-03-15 21:53:27 +01:00
dbc663cbbc feat: Add YouTube favorites fetcher to export data as CSV 2025-03-15 21:53:26 +01:00
11 changed files with 224 additions and 117 deletions

1
.gitignore vendored
View File

@ -18,3 +18,4 @@ __pycache__/
/.coverage /.coverage
/.hypothesis/ /.hypothesis/
/htmlcov/ /htmlcov/
.aider*

View File

@ -35,7 +35,9 @@ def fmt_line(label: Label, total_time: datetime.timedelta) -> str:
label_str = str(label.label) label_str = str(label.label)
return f' {label.category:20} {label_str:50} {hours:-4d}h {minutes:-2d}m' return f' {label.category:20} {label_str:50} {hours:-4d}h {minutes:-2d}m'
LINE_LENGTH = len(fmt_line(Label('',''), datetime.timedelta()))
LINE_LENGTH = len(fmt_line(Label('', ''), datetime.timedelta()))
def generate_report( def generate_report(
samples: list[RealizedActivitySample], samples: list[RealizedActivitySample],

View File

@ -31,6 +31,7 @@ HOUR = datetime.timedelta(hours=1)
MINUTE = datetime.timedelta(minutes=1) MINUTE = datetime.timedelta(minutes=1)
SECOND = datetime.timedelta(seconds=1) SECOND = datetime.timedelta(seconds=1)
def to_text_duration(duration: datetime.timedelta) -> str: def to_text_duration(duration: datetime.timedelta) -> str:
hours = int(duration / HOUR) hours = int(duration / HOUR)
duration -= hours * HOUR duration -= hours * HOUR
@ -129,6 +130,7 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
return num_updated return num_updated
def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int: def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int:
num_updated = 0 num_updated = 0
@ -141,22 +143,32 @@ def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int:
COLUMNS = ['score.w1', 'score.w2', 'score.w3', 'score.w4', 'score.w5'] COLUMNS = ['score.w1', 'score.w2', 'score.w3', 'score.w4', 'score.w5']
def all_steps(row: dict[str,int]): def all_steps(row: dict[str, int]):
return sum(row[column] for column in COLUMNS) return sum(row[column] for column in COLUMNS)
steps_per_date = { steps_per_date = {
date: sum(all_steps(row) for row in rows) for date, rows in rows_per_date.items() date: sum(all_steps(row) for row in rows)
for date, rows in rows_per_date.items()
} }
duration_per_date = { duration_per_date = {
date: sum((row['play.duration'] for row in rows), start=datetime.timedelta()) for date, rows in rows_per_date.items() date: sum((row['play.duration'] for row in rows), start=datetime.timedelta())
for date, rows in rows_per_date.items()
} }
print(steps_per_date) print(steps_per_date)
print(duration_per_date) print(duration_per_date)
for date in steps_per_date: for date in steps_per_date:
was_updated_1 = vault.add_statistic(date, 'Stepmania (Steps)', int(steps_per_date[date])) was_updated_1 = vault.add_statistic(
was_updated_2 = vault.add_statistic(date, 'Stepmania (Duration)', to_text_duration(duration_per_date[date])) date,
'Stepmania (Steps)',
int(steps_per_date[date]),
)
was_updated_2 = vault.add_statistic(
date,
'Stepmania (Duration)',
to_text_duration(duration_per_date[date]),
)
if was_updated_1 or was_updated_2: if was_updated_1 or was_updated_2:
num_updated += 1 num_updated += 1
del date, was_updated_1, was_updated_2 del date, was_updated_1, was_updated_2
@ -247,27 +259,47 @@ def map_games_played_content(sample: RealizedActivitySample) -> EventContent:
PATH_WATCHED = Path('output/show_episodes_watched.csv') PATH_WATCHED = Path('output/show_episodes_watched.csv')
PATH_PLAYED = Path('output/games_played.csv') PATH_PLAYED = Path('output/games_played.csv')
PATH_WORKOUT = Path('/home/jmaa/Notes/workout.csv') PATH_WORKOUT = Path('/home/jmaa/Notes/workout.csv')
PATH_STEP_COUNTS = Path( PATH_STEP_COUNTS = Path(
'/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv', '/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv',
) )
PATH_STEPMANIA = Path('output/stepmania.csv') PATH_STEPMANIA = Path('output/stepmania.csv')
IMPORTERS = [ IMPORTERS = [
{'path': PATH_WORKOUT, 'import_rows': import_workout_csv}, {'path': PATH_WORKOUT, 'import_rows': import_workout_csv},
{'path': PATH_STEP_COUNTS, 'import_rows': import_step_counts_csv}, {'path': PATH_STEP_COUNTS, 'import_rows': import_step_counts_csv},
{'path': PATH_STEPMANIA, 'import_rows': import_stepmania_steps_csv}, {'path': PATH_STEPMANIA, 'import_rows': import_stepmania_steps_csv},
{'path': PATH_PLAYED, 'import_rows': lambda vault, rows: import_activity_sample_csv(vault, rows, map_games_played_content, group_category='game.name',) }, {
{'path': PATH_WATCHED, 'import_rows': lambda vault, rows: import_activity_sample_csv(vault, rows, map_watched_series_content) }, 'path': PATH_PLAYED,
'import_rows': lambda vault, rows: import_activity_sample_csv(
vault,
rows,
map_games_played_content,
group_category='game.name',
),
},
{
'path': PATH_WATCHED,
'import_rows': lambda vault, rows: import_activity_sample_csv(
vault,
rows,
map_watched_series_content,
),
},
] ]
def import_data(obsidian_path: Path, dry_run=True): def import_data(obsidian_path: Path, dry_run=True):
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None) vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
for import_def in IMPORTERS: for import_def in IMPORTERS:
if not import_def['path'].exists(): if not import_def['path'].exists():
logger.warning('Skipping %s: %s is missing', import_def['import_rows'], import_def['path']) logger.warning(
'Skipping %s: %s is missing',
import_def['import_rows'],
import_def['path'],
)
continue continue
rows = load_csv_file(import_def['path']) rows = load_csv_file(import_def['path'])
logger.info('Loaded CSV with %d lines', len(rows)) logger.info('Loaded CSV with %d lines', len(rows))
@ -275,7 +307,6 @@ def import_data(obsidian_path: Path, dry_run=True):
logger.info('Updated %d files', num_files_updated) logger.info('Updated %d files', num_files_updated)
del import_def, rows del import_def, rows
num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty]) num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty])
logger.info('dirty files in cache: %d', num_dirty) logger.info('dirty files in cache: %d', num_dirty)
logger.info( logger.info(

View File

@ -114,7 +114,7 @@ class PossibleKeys:
misc: list[str] misc: list[str]
def is_duration_key(k,v): def is_duration_key(k, v):
if isinstance(v, Decimal) and 'duration_seconds' in k: if isinstance(v, Decimal) and 'duration_seconds' in k:
return True return True
if isinstance(v, datetime.timedelta) and 'duration' in k: if isinstance(v, datetime.timedelta) and 'duration' in k:
@ -125,11 +125,7 @@ def is_duration_key(k,v):
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys: def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
# Select data # Select data
time_keys = [k for k, v in event_data.items() if isinstance(v, datetime.date)] time_keys = [k for k, v in event_data.items() if isinstance(v, datetime.date)]
duration_keys = [ duration_keys = [k for k, v in event_data.items() if is_duration_key(k, v)]
k
for k, v in event_data.items()
if is_duration_key(k,v)
]
name_keys = [k for k, v in event_data.items() if isinstance(v, str)] name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
image_keys = [ image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult) k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)

View File

@ -1,26 +1,22 @@
import dataclasses import dataclasses
import datetime
import logging import logging
from collections.abc import Iterator, Mapping from collections.abc import Iterator, Mapping
from decimal import Decimal
from typing import Any from typing import Any
from personal_data.data import DeduplicateMode, Scraper from personal_data.data import DeduplicateMode, Scraper
from ..util import safe_del
from .. import secrets from .. import secrets
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def safe_del(d: dict, *keys: str):
for key in keys:
if key in d:
del d[key]
def to_data_point(p: dict[str,Any]) ->Mapping[str, Any]: def to_data_point(p: dict[str, Any]) -> Mapping[str, Any]:
p['owner'] = p['owner']['login'] p['owner'] = p['owner']['login']
safe_del(p, 'permissions', 'internal_tracker') safe_del(p, 'permissions', 'internal_tracker')
return p return p
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class Gitea(Scraper): class Gitea(Scraper):
dataset_name = 'gitea_repos' dataset_name = 'gitea_repos'
@ -32,13 +28,16 @@ class Gitea(Scraper):
return False return False
def scrape(self) -> Iterator[Mapping[str, Any]]: def scrape(self) -> Iterator[Mapping[str, Any]]:
response = self.session.get('https://gitfub.space/api/v1/repos/search', params = { response = self.session.get(
#'uid':21, 'https://gitfub.space/api/v1/repos/search',
'private': True, params={
'sort':'updated', #'uid':21,
'order':'desc', 'private': True,
'access_token': secrets.gitea_access_token(), 'sort': 'updated',
}) 'order': 'desc',
'access_token': secrets.gitea_access_token(),
},
)
response.raise_for_status() response.raise_for_status()
data = response.json() data = response.json()

View File

@ -1,18 +1,17 @@
import abc import dataclasses
import bs4 import json
import logging
import re import re
import urllib.parse import urllib.parse
import json from collections.abc import Iterator
import dataclasses
import logging import bs4
import secrets
from collections.abc import Iterator, Mapping
from enum import Enum
from personal_data.data import DeduplicateMode, Scraper from personal_data.data import DeduplicateMode, Scraper
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class MyAnimeListAnime: class MyAnimeListAnime:
series_name_eng: str series_name_eng: str
@ -21,6 +20,7 @@ class MyAnimeListAnime:
series_icon: urllib.parse.ParseResult series_icon: urllib.parse.ParseResult
me_score: int me_score: int
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class MyAnimeListSong: class MyAnimeListSong:
song_name_eng: str song_name_eng: str
@ -48,25 +48,39 @@ class MyAnimeList(Scraper):
for data_item in data_items: for data_item in data_items:
yield MyAnimeListAnime( yield MyAnimeListAnime(
series_name_eng= data_item.get('anime_title_eng') or data_item.get('anime_title'), series_name_eng=data_item.get('anime_title_eng')
series_name= data_item.get('anime_title') or data_item.get('anime_title_eng'), or data_item.get('anime_title'),
series_myanimelist_url= urllib.parse.urlparse(urllib.parse.urljoin(url, data_item['anime_url'])), series_name=data_item.get('anime_title')
series_icon= urllib.parse.urlparse(urllib.parse.urljoin(url, data_item['anime_image_path'])), or data_item.get('anime_title_eng'),
me_score= data_item.get('score'), series_myanimelist_url=urllib.parse.urlparse(
urllib.parse.urljoin(url, data_item['anime_url']),
),
series_icon=urllib.parse.urlparse(
urllib.parse.urljoin(url, data_item['anime_image_path']),
),
me_score=data_item.get('score'),
) )
del data_item del data_item
def parse_name(text: str): def parse_name(text: str):
match = re.fullmatch(r'^(?:\d+:\s*)?"(.*?)(?:\((.*)\))?"$', text) match = re.fullmatch(r'^(?:\d+:\s*)?"(.*?)(?:\((.*)\))?"$', text)
return match return match
assert parse_name('"Soundscape"') assert parse_name('"Soundscape"')
assert parse_name('"Soundscape (サウンドスケープ)"').group(2) is not None assert parse_name('"Soundscape (サウンドスケープ)"').group(2) is not None
assert parse_name('1: "Soundscape"') assert parse_name('1: "Soundscape"')
assert parse_name('2: "Soundscape (サウンドスケープ)"').group(2) is not None assert parse_name('2: "Soundscape (サウンドスケープ)"').group(2) is not None
def parse_songs(tr_elements, song_position: str, series_name_eng: str, series_name: str):
def parse_songs(
tr_elements,
song_position: str,
series_name_eng: str,
series_name: str,
):
print(series_name_eng, len(tr_elements)) print(series_name_eng, len(tr_elements))
for song_tr in tr_elements: for song_tr in tr_elements:
artist = song_tr.select_one('.theme-song-artist') artist = song_tr.select_one('.theme-song-artist')
@ -77,26 +91,26 @@ def parse_songs(tr_elements, song_position: str, series_name_eng: str, series_na
e.extract() e.extract()
del e del e
song_artist = artist.get_text().strip().removeprefix('by ') song_artist = artist.get_text().strip().removeprefix('by ')
song_name_eng = song_tr.get_text().strip() song_name_eng = song_tr.get_text().strip()
m = parse_name(song_name_eng ) m = parse_name(song_name_eng)
song_name_eng = m.group(1).strip() song_name_eng = m.group(1).strip()
song_name_jp = m.group(2).strip() if m.group(2) else None song_name_jp = m.group(2).strip() if m.group(2) else None
song= MyAnimeListSong( song = MyAnimeListSong(
song_name_eng = song_name_eng , song_name_eng=song_name_eng,
song_name_jp = song_name_jp , song_name_jp=song_name_jp,
song_artist = song_artist, song_artist=song_artist,
song_placement = song_position, song_placement=song_position,
series_name_eng = series_name_eng, series_name_eng=series_name_eng,
series_name = series_name, series_name=series_name,
) )
print(' ', song_name_eng) print(' ', song_name_eng)
yield song yield song
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class MyAnimeListSongs(Scraper): class MyAnimeListSongs(Scraper):
dataset_name = 'myanimelist_songs' dataset_name = 'myanimelist_songs'
@ -113,10 +127,18 @@ class MyAnimeListSongs(Scraper):
for script in soup.select('.oped-popup'): for script in soup.select('.oped-popup'):
script.extract() script.extract()
yield from parse_songs(soup.select('.theme-songs.opnening table tr'), yield from parse_songs(
'opening', anime.series_name_eng, anime.series_name) soup.select('.theme-songs.opnening table tr'),
yield from parse_songs(soup.select('.theme-songs.ending table tr'), 'opening',
'ending', anime.series_name_eng, anime.series_name) anime.series_name_eng,
anime.series_name,
)
yield from parse_songs(
soup.select('.theme-songs.ending table tr'),
'ending',
anime.series_name_eng,
anime.series_name,
)
def scrape(self) -> Iterator[MyAnimeListSong]: def scrape(self) -> Iterator[MyAnimeListSong]:
for anime in MyAnimeList(self.session).scrape(): for anime in MyAnimeList(self.session).scrape():

View File

@ -1,20 +1,20 @@
import dataclasses import dataclasses
import datetime import datetime
import logging import logging
import datetime
from collections.abc import Iterator, Mapping
from decimal import Decimal
from pathlib import Path
import bs4
import zoneinfo import zoneinfo
from collections.abc import Iterator, Mapping
from pathlib import Path
import bs4
from personal_data.data import DeduplicateMode, Scraper from personal_data.data import DeduplicateMode, Scraper
from .. import secrets
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
STATS_FILE_PATH: Path = Path('/home/jmaa/.itgmania/Save/LocalProfiles/00000000/Stats.xml') STATS_FILE_PATH: Path = Path(
'/home/jmaa/.itgmania/Save/LocalProfiles/00000000/Stats.xml',
)
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class Stepmania(Scraper): class Stepmania(Scraper):
@ -33,30 +33,32 @@ class Stepmania(Scraper):
# Derp # Derp
for score in soup.select('SongScores Song HighScoreList HighScore'): for score in soup.select('SongScores Song HighScoreList HighScore'):
song = score.parent.parent.parent song = score.parent.parent.parent
song_path = Path(song ['Dir'].removesuffix('/')) song_path = Path(song['Dir'].removesuffix('/'))
disqualified = score.select_one('Disqualified').get_text().strip() != '0' disqualified = score.select_one('Disqualified').get_text().strip() != '0'
if disqualified: if disqualified:
logger.warning('Ignored disqualified') logger.warning('Ignored disqualified')
continue continue
play_start = datetime.datetime.fromisoformat(score.select_one('DateTime').get_text()) play_start = datetime.datetime.fromisoformat(
score.select_one('DateTime').get_text(),
)
play_start = play_start.replace(tzinfo=timezone).astimezone(datetime.UTC) play_start = play_start.replace(tzinfo=timezone).astimezone(datetime.UTC)
play_seconds = float(score.select_one('SurviveSeconds').get_text()) play_seconds = float(score.select_one('SurviveSeconds').get_text())
yield { yield {
'song.name': song_path.stem, 'song.name': song_path.stem,
'song.pack': song_path.parent.stem, 'song.pack': song_path.parent.stem,
'song.difficulty': score.parent.parent['Difficulty'], 'song.difficulty': score.parent.parent['Difficulty'],
'song.grade': score.select_one('Grade').get_text(), 'song.grade': score.select_one('Grade').get_text(),
'play.start': play_start, 'play.start': play_start,
'play.duration': datetime.timedelta(seconds=play_seconds), 'play.duration': datetime.timedelta(seconds=play_seconds),
'score.score': float(score.select_one('PercentDP').get_text()), 'score.score': float(score.select_one('PercentDP').get_text()),
'score.w1': int(score.select_one('W1').get_text()), 'score.w1': int(score.select_one('W1').get_text()),
'score.w2': int(score.select_one('W2').get_text()), 'score.w2': int(score.select_one('W2').get_text()),
'score.w3': int(score.select_one('W3').get_text()), 'score.w3': int(score.select_one('W3').get_text()),
'score.w4': int(score.select_one('W4').get_text()), 'score.w4': int(score.select_one('W4').get_text()),
'score.w5': int(score.select_one('W5').get_text()), 'score.w5': int(score.select_one('W5').get_text()),
'score.miss': int(score.select_one('Miss').get_text()), 'score.miss': int(score.select_one('Miss').get_text()),
} }

View File

@ -0,0 +1,41 @@
import csv
import json
import logging
import subprocess
from dataclasses import dataclass
from personal_data.data import DeduplicateMode, Scraper
from ..util import safe_del
logger = logging.getLogger(__name__)
PLAYLIST_ID='PLAfDVJvDKCvOMvfoTL7eW8GkWNJwd90eV'
#PLAYLIST_ID='LL'
@dataclass(frozen=True)
class YoutubeFavoritesScraper(Scraper):
dataset_name: str = 'youtube_favorites'
deduplicate_mode: DeduplicateMode = DeduplicateMode.BY_ALL_COLUMNS
deduplicate_ignore_columns = []
def scrape(self) -> list[dict]:
"""Use yt-dlp to fetch the list of favorited videos. This is a placeholder for invoking yt-dlp and parsing its output."""
result = subprocess.run(
[
'yt-dlp',
'--flat-playlist',
'--dump-json',
f'https://www.youtube.com/playlist?list={PLAYLIST_ID}',
],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise RuntimeError(f'Non-zero returncode in command: {result.returncode}\n\n{result.stderr}')
for line in result.stdout.splitlines():
data = json.loads(line)
data['thumbnail'] = data['thumbnails'][-1]['url']
safe_del(data, '_type', '_version', 'thumbnails')
yield data

View File

@ -10,6 +10,7 @@ CRUNCHYROLL_AUTH = secrets.load('CRUNCHYROLL_AUTH')
# FFXIV # FFXIV
FFXIV_CHARACTER_ID = secrets.load('FFXIV_CHARACTER_ID') FFXIV_CHARACTER_ID = secrets.load('FFXIV_CHARACTER_ID')
# Playstation # Playstation
def playstation_psn_id(): def playstation_psn_id():
return secrets.load_or_fail('PLAYSTATION_PSN_ID') return secrets.load_or_fail('PLAYSTATION_PSN_ID')
@ -24,6 +25,7 @@ def pbc_account_address():
def steam_username(): def steam_username():
return secrets.load_or_fail('STEAM_USERNAME') return secrets.load_or_fail('STEAM_USERNAME')
# Gitea # Gitea
def gitea_access_token(): def gitea_access_token():
return secrets.load('GITEA_ACCESS_TOKEN') return secrets.load('GITEA_ACCESS_TOKEN')

View File

@ -1,10 +1,8 @@
import _csv import _csv
import csv import csv
import dataclasses import dataclasses
import datetime
import io import io
import logging import logging
import urllib.parse
from collections.abc import Iterable, Mapping from collections.abc import Iterable, Mapping
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -16,6 +14,13 @@ from . import csv_import, data
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def safe_del(d: dict, *keys: str):
for key in keys:
if key in d:
del d[key]
def equals_without_fields( def equals_without_fields(
a: Mapping[str, Any], a: Mapping[str, Any],
b: Mapping[str, Any], b: Mapping[str, Any],
@ -91,15 +96,18 @@ def deduplicate_dicts(
def dataclass_to_dict(obj) -> dict[str, Any]: def dataclass_to_dict(obj) -> dict[str, Any]:
d = dataclasses.asdict(obj) d = dataclasses.asdict(obj)
return {k.replace('_','.',1):v for k,v in d.items()} return {k.replace('_', '.', 1): v for k, v in d.items()}
def normalize_dict(d: dict[str, Any] | frozendict[str, Any]) -> frozendict[str, Any]: def normalize_dict(d: dict[str, Any] | frozendict[str, Any]) -> frozendict[str, Any]:
if not isinstance(d, dict) and not isinstance(d, frozendict): if not isinstance(d, dict) and not isinstance(d, frozendict):
d = dataclass_to_dict(d) d = dataclass_to_dict(d)
assert isinstance(d, dict) or isinstance(d, frozendict), 'Not a dict' assert isinstance(d, dict) or isinstance(d, frozendict), 'Not a dict'
safe_values = [(k, csv_import.csv_str_to_value(csv_import.csv_safe_value(v))) for k, v in d.items() ] safe_values = [
return frozendict( {k:v for k,v in safe_values if v is not None}) (k, csv_import.csv_str_to_value(csv_import.csv_safe_value(v)))
for k, v in d.items()
]
return frozendict({k: v for k, v in safe_values if v is not None})
def extend_csv_file( def extend_csv_file(

View File

@ -1,17 +1,16 @@
import sys
import bs4
import zipfile
import subprocess
import csv
import requests
from pathlib import Path
import personal_data.csv_import
import personal_data.main
import dataclasses import dataclasses
import logging import logging
import subprocess
from pathlib import Path
import bs4
import personal_data.csv_import
import personal_data.main
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclasses.dataclass @dataclasses.dataclass
class Result: class Result:
title: str title: str
@ -19,9 +18,8 @@ class Result:
levels: str levels: str
SESSION = personal_data.main.get_session( SESSION = personal_data.main.get_session([], with_cfscrape=False, ignore_cache=False)
[],
with_cfscrape=False, ignore_cache=False)
def parse_results(response) -> list[Result]: def parse_results(response) -> list[Result]:
soup = bs4.BeautifulSoup(response.text, 'lxml') soup = bs4.BeautifulSoup(response.text, 'lxml')
@ -36,34 +34,40 @@ def parse_results(response) -> list[Result]:
continue continue
id = link['href'].removeprefix('viewsimfile.php?simfileid=') id = link['href'].removeprefix('viewsimfile.php?simfileid=')
levels = cells[1].get_text().strip() levels = cells[1].get_text().strip()
results.append(Result(title , int(id), levels)) results.append(Result(title, int(id), levels))
return results return results
def search_for_song(song_data) -> Result | None: def search_for_song(song_data) -> Result | None:
response = SESSION.post('https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php', response = SESSION.post(
data={ 'https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php',
'songtitle': song_data['song.name_eng'], data={
'songartist': song_data['song.artist'], 'songtitle': song_data['song.name_eng'],
}) 'songartist': song_data['song.artist'],
},
)
if results := parse_results(response): if results := parse_results(response):
return results[0] return results[0]
response = SESSION.post('https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php', response = SESSION.post(
data={ 'https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php',
'songtitle': song_data['song.name_eng'], data={
'songartist': '', 'songtitle': song_data['song.name_eng'],
}) 'songartist': '',
},
)
if results := parse_results(response): if results := parse_results(response):
return results[0] return results[0]
logger.warning('No results for %s', song_data['song.name_eng']) logger.warning('No results for %s', song_data['song.name_eng'])
return None return None
def download_song(song_data, output_dir: Path): def download_song(song_data, output_dir: Path):
song_result = search_for_song(song_data) song_result = search_for_song(song_data)
if song_result is None: if song_result is None:
return return
path_zip = output_dir/f'zenius-{song_result.id}-{song_result.title}.zip' path_zip = output_dir / f'zenius-{song_result.id}-{song_result.title}.zip'
if path_zip.exists(): if path_zip.exists():
logger.warning('Skipping existing file') logger.warning('Skipping existing file')
return return
@ -75,10 +79,11 @@ def download_song(song_data, output_dir: Path):
cmd = ['curl', '-L', '--fail', url, '-o', path_zip] cmd = ['curl', '-L', '--fail', url, '-o', path_zip]
subprocess.run(cmd, check=True, capture_output=True) subprocess.run(cmd, check=True, capture_output=True)
def main(): def main():
csv_path = Path('./output/myanimelist_songs.csv') csv_path = Path('./output/myanimelist_songs.csv')
output_path = Path('./output/songs') output_path = Path('./output/songs')
output_path.mkdir(exist_ok=True,parents=True) output_path.mkdir(exist_ok=True, parents=True)
songs = personal_data.csv_import.load_csv_file(csv_path) songs = personal_data.csv_import.load_csv_file(csv_path)
for song in songs: for song in songs:
@ -88,5 +93,3 @@ def main():
if __name__ == '__main__': if __name__ == '__main__':
main() main()