Compare commits
6 Commits
4dfbde77ec
...
9058279b4e
Author | SHA1 | Date | |
---|---|---|---|
9058279b4e | |||
3d9c694fe8 | |||
a0e8d1ec28 | |||
2a4aec9d33 | |||
965689df7a | |||
dbc663cbbc |
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -18,3 +18,4 @@ __pycache__/
|
|||
/.coverage
|
||||
/.hypothesis/
|
||||
/htmlcov/
|
||||
.aider*
|
||||
|
|
|
@ -35,8 +35,10 @@ def fmt_line(label: Label, total_time: datetime.timedelta) -> str:
|
|||
label_str = str(label.label)
|
||||
return f' {label.category:20} {label_str:50} {hours:-4d}h {minutes:-2d}m'
|
||||
|
||||
|
||||
LINE_LENGTH = len(fmt_line(Label('', ''), datetime.timedelta()))
|
||||
|
||||
|
||||
def generate_report(
|
||||
samples: list[RealizedActivitySample],
|
||||
) -> Iterator[str]:
|
||||
|
|
|
@ -31,6 +31,7 @@ HOUR = datetime.timedelta(hours=1)
|
|||
MINUTE = datetime.timedelta(minutes=1)
|
||||
SECOND = datetime.timedelta(seconds=1)
|
||||
|
||||
|
||||
def to_text_duration(duration: datetime.timedelta) -> str:
|
||||
hours = int(duration / HOUR)
|
||||
duration -= hours * HOUR
|
||||
|
@ -129,6 +130,7 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
|
|||
|
||||
return num_updated
|
||||
|
||||
|
||||
def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int:
|
||||
num_updated = 0
|
||||
|
||||
|
@ -145,18 +147,28 @@ def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int:
|
|||
return sum(row[column] for column in COLUMNS)
|
||||
|
||||
steps_per_date = {
|
||||
date: sum(all_steps(row) for row in rows) for date, rows in rows_per_date.items()
|
||||
date: sum(all_steps(row) for row in rows)
|
||||
for date, rows in rows_per_date.items()
|
||||
}
|
||||
|
||||
duration_per_date = {
|
||||
date: sum((row['play.duration'] for row in rows), start=datetime.timedelta()) for date, rows in rows_per_date.items()
|
||||
date: sum((row['play.duration'] for row in rows), start=datetime.timedelta())
|
||||
for date, rows in rows_per_date.items()
|
||||
}
|
||||
print(steps_per_date)
|
||||
print(duration_per_date)
|
||||
|
||||
for date in steps_per_date:
|
||||
was_updated_1 = vault.add_statistic(date, 'Stepmania (Steps)', int(steps_per_date[date]))
|
||||
was_updated_2 = vault.add_statistic(date, 'Stepmania (Duration)', to_text_duration(duration_per_date[date]))
|
||||
was_updated_1 = vault.add_statistic(
|
||||
date,
|
||||
'Stepmania (Steps)',
|
||||
int(steps_per_date[date]),
|
||||
)
|
||||
was_updated_2 = vault.add_statistic(
|
||||
date,
|
||||
'Stepmania (Duration)',
|
||||
to_text_duration(duration_per_date[date]),
|
||||
)
|
||||
if was_updated_1 or was_updated_2:
|
||||
num_updated += 1
|
||||
del date, was_updated_1, was_updated_2
|
||||
|
@ -258,16 +270,36 @@ IMPORTERS = [
|
|||
{'path': PATH_WORKOUT, 'import_rows': import_workout_csv},
|
||||
{'path': PATH_STEP_COUNTS, 'import_rows': import_step_counts_csv},
|
||||
{'path': PATH_STEPMANIA, 'import_rows': import_stepmania_steps_csv},
|
||||
{'path': PATH_PLAYED, 'import_rows': lambda vault, rows: import_activity_sample_csv(vault, rows, map_games_played_content, group_category='game.name',) },
|
||||
{'path': PATH_WATCHED, 'import_rows': lambda vault, rows: import_activity_sample_csv(vault, rows, map_watched_series_content) },
|
||||
{
|
||||
'path': PATH_PLAYED,
|
||||
'import_rows': lambda vault, rows: import_activity_sample_csv(
|
||||
vault,
|
||||
rows,
|
||||
map_games_played_content,
|
||||
group_category='game.name',
|
||||
),
|
||||
},
|
||||
{
|
||||
'path': PATH_WATCHED,
|
||||
'import_rows': lambda vault, rows: import_activity_sample_csv(
|
||||
vault,
|
||||
rows,
|
||||
map_watched_series_content,
|
||||
),
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def import_data(obsidian_path: Path, dry_run=True):
|
||||
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
|
||||
|
||||
for import_def in IMPORTERS:
|
||||
if not import_def['path'].exists():
|
||||
logger.warning('Skipping %s: %s is missing', import_def['import_rows'], import_def['path'])
|
||||
logger.warning(
|
||||
'Skipping %s: %s is missing',
|
||||
import_def['import_rows'],
|
||||
import_def['path'],
|
||||
)
|
||||
continue
|
||||
rows = load_csv_file(import_def['path'])
|
||||
logger.info('Loaded CSV with %d lines', len(rows))
|
||||
|
@ -275,7 +307,6 @@ def import_data(obsidian_path: Path, dry_run=True):
|
|||
logger.info('Updated %d files', num_files_updated)
|
||||
del import_def, rows
|
||||
|
||||
|
||||
num_dirty = len([f for f in vault.internal_file_text_cache.values() if f.is_dirty])
|
||||
logger.info('dirty files in cache: %d', num_dirty)
|
||||
logger.info(
|
||||
|
|
|
@ -125,11 +125,7 @@ def is_duration_key(k,v):
|
|||
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
|
||||
# Select data
|
||||
time_keys = [k for k, v in event_data.items() if isinstance(v, datetime.date)]
|
||||
duration_keys = [
|
||||
k
|
||||
for k, v in event_data.items()
|
||||
if is_duration_key(k,v)
|
||||
]
|
||||
duration_keys = [k for k, v in event_data.items() if is_duration_key(k, v)]
|
||||
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
|
||||
image_keys = [
|
||||
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
|
||||
|
|
|
@ -1,26 +1,22 @@
|
|||
import dataclasses
|
||||
import datetime
|
||||
import logging
|
||||
from collections.abc import Iterator, Mapping
|
||||
from decimal import Decimal
|
||||
from typing import Any
|
||||
|
||||
from personal_data.data import DeduplicateMode, Scraper
|
||||
from ..util import safe_del
|
||||
|
||||
from .. import secrets
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def safe_del(d: dict, *keys: str):
|
||||
for key in keys:
|
||||
if key in d:
|
||||
del d[key]
|
||||
|
||||
def to_data_point(p: dict[str, Any]) -> Mapping[str, Any]:
|
||||
p['owner'] = p['owner']['login']
|
||||
safe_del(p, 'permissions', 'internal_tracker')
|
||||
return p
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class Gitea(Scraper):
|
||||
dataset_name = 'gitea_repos'
|
||||
|
@ -32,13 +28,16 @@ class Gitea(Scraper):
|
|||
return False
|
||||
|
||||
def scrape(self) -> Iterator[Mapping[str, Any]]:
|
||||
response = self.session.get('https://gitfub.space/api/v1/repos/search', params = {
|
||||
response = self.session.get(
|
||||
'https://gitfub.space/api/v1/repos/search',
|
||||
params={
|
||||
#'uid':21,
|
||||
'private': True,
|
||||
'sort': 'updated',
|
||||
'order': 'desc',
|
||||
'access_token': secrets.gitea_access_token(),
|
||||
})
|
||||
},
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
|
|
|
@ -1,18 +1,17 @@
|
|||
import abc
|
||||
import bs4
|
||||
import dataclasses
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import urllib.parse
|
||||
import json
|
||||
import dataclasses
|
||||
import logging
|
||||
import secrets
|
||||
from collections.abc import Iterator, Mapping
|
||||
from enum import Enum
|
||||
from collections.abc import Iterator
|
||||
|
||||
import bs4
|
||||
|
||||
from personal_data.data import DeduplicateMode, Scraper
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class MyAnimeListAnime:
|
||||
series_name_eng: str
|
||||
|
@ -21,6 +20,7 @@ class MyAnimeListAnime:
|
|||
series_icon: urllib.parse.ParseResult
|
||||
me_score: int
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class MyAnimeListSong:
|
||||
song_name_eng: str
|
||||
|
@ -48,25 +48,39 @@ class MyAnimeList(Scraper):
|
|||
|
||||
for data_item in data_items:
|
||||
yield MyAnimeListAnime(
|
||||
series_name_eng= data_item.get('anime_title_eng') or data_item.get('anime_title'),
|
||||
series_name= data_item.get('anime_title') or data_item.get('anime_title_eng'),
|
||||
series_myanimelist_url= urllib.parse.urlparse(urllib.parse.urljoin(url, data_item['anime_url'])),
|
||||
series_icon= urllib.parse.urlparse(urllib.parse.urljoin(url, data_item['anime_image_path'])),
|
||||
series_name_eng=data_item.get('anime_title_eng')
|
||||
or data_item.get('anime_title'),
|
||||
series_name=data_item.get('anime_title')
|
||||
or data_item.get('anime_title_eng'),
|
||||
series_myanimelist_url=urllib.parse.urlparse(
|
||||
urllib.parse.urljoin(url, data_item['anime_url']),
|
||||
),
|
||||
series_icon=urllib.parse.urlparse(
|
||||
urllib.parse.urljoin(url, data_item['anime_image_path']),
|
||||
),
|
||||
me_score=data_item.get('score'),
|
||||
)
|
||||
|
||||
del data_item
|
||||
|
||||
|
||||
def parse_name(text: str):
|
||||
match = re.fullmatch(r'^(?:\d+:\s*)?"(.*?)(?:\((.*)\))?"$', text)
|
||||
return match
|
||||
|
||||
|
||||
assert parse_name('"Soundscape"')
|
||||
assert parse_name('"Soundscape (サウンドスケープ)"').group(2) is not None
|
||||
assert parse_name('1: "Soundscape"')
|
||||
assert parse_name('2: "Soundscape (サウンドスケープ)"').group(2) is not None
|
||||
|
||||
def parse_songs(tr_elements, song_position: str, series_name_eng: str, series_name: str):
|
||||
|
||||
def parse_songs(
|
||||
tr_elements,
|
||||
song_position: str,
|
||||
series_name_eng: str,
|
||||
series_name: str,
|
||||
):
|
||||
print(series_name_eng, len(tr_elements))
|
||||
for song_tr in tr_elements:
|
||||
artist = song_tr.select_one('.theme-song-artist')
|
||||
|
@ -77,7 +91,6 @@ def parse_songs(tr_elements, song_position: str, series_name_eng: str, series_na
|
|||
e.extract()
|
||||
del e
|
||||
|
||||
|
||||
song_artist = artist.get_text().strip().removeprefix('by ')
|
||||
|
||||
song_name_eng = song_tr.get_text().strip()
|
||||
|
@ -97,6 +110,7 @@ def parse_songs(tr_elements, song_position: str, series_name_eng: str, series_na
|
|||
print(' ', song_name_eng)
|
||||
yield song
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class MyAnimeListSongs(Scraper):
|
||||
dataset_name = 'myanimelist_songs'
|
||||
|
@ -113,10 +127,18 @@ class MyAnimeListSongs(Scraper):
|
|||
for script in soup.select('.oped-popup'):
|
||||
script.extract()
|
||||
|
||||
yield from parse_songs(soup.select('.theme-songs.opnening table tr'),
|
||||
'opening', anime.series_name_eng, anime.series_name)
|
||||
yield from parse_songs(soup.select('.theme-songs.ending table tr'),
|
||||
'ending', anime.series_name_eng, anime.series_name)
|
||||
yield from parse_songs(
|
||||
soup.select('.theme-songs.opnening table tr'),
|
||||
'opening',
|
||||
anime.series_name_eng,
|
||||
anime.series_name,
|
||||
)
|
||||
yield from parse_songs(
|
||||
soup.select('.theme-songs.ending table tr'),
|
||||
'ending',
|
||||
anime.series_name_eng,
|
||||
anime.series_name,
|
||||
)
|
||||
|
||||
def scrape(self) -> Iterator[MyAnimeListSong]:
|
||||
for anime in MyAnimeList(self.session).scrape():
|
||||
|
|
|
@ -1,20 +1,20 @@
|
|||
import dataclasses
|
||||
import datetime
|
||||
import logging
|
||||
import datetime
|
||||
from collections.abc import Iterator, Mapping
|
||||
from decimal import Decimal
|
||||
from pathlib import Path
|
||||
import bs4
|
||||
import zoneinfo
|
||||
from collections.abc import Iterator, Mapping
|
||||
from pathlib import Path
|
||||
|
||||
import bs4
|
||||
|
||||
from personal_data.data import DeduplicateMode, Scraper
|
||||
|
||||
from .. import secrets
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
STATS_FILE_PATH: Path = Path('/home/jmaa/.itgmania/Save/LocalProfiles/00000000/Stats.xml')
|
||||
STATS_FILE_PATH: Path = Path(
|
||||
'/home/jmaa/.itgmania/Save/LocalProfiles/00000000/Stats.xml',
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class Stepmania(Scraper):
|
||||
|
@ -40,7 +40,9 @@ class Stepmania(Scraper):
|
|||
logger.warning('Ignored disqualified')
|
||||
continue
|
||||
|
||||
play_start = datetime.datetime.fromisoformat(score.select_one('DateTime').get_text())
|
||||
play_start = datetime.datetime.fromisoformat(
|
||||
score.select_one('DateTime').get_text(),
|
||||
)
|
||||
play_start = play_start.replace(tzinfo=timezone).astimezone(datetime.UTC)
|
||||
|
||||
play_seconds = float(score.select_one('SurviveSeconds').get_text())
|
||||
|
|
41
personal_data/fetchers/youtube.py
Normal file
41
personal_data/fetchers/youtube.py
Normal file
|
@ -0,0 +1,41 @@
|
|||
import csv
|
||||
import json
|
||||
import logging
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
|
||||
from personal_data.data import DeduplicateMode, Scraper
|
||||
from ..util import safe_del
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PLAYLIST_ID='PLAfDVJvDKCvOMvfoTL7eW8GkWNJwd90eV'
|
||||
#PLAYLIST_ID='LL'
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class YoutubeFavoritesScraper(Scraper):
|
||||
dataset_name: str = 'youtube_favorites'
|
||||
deduplicate_mode: DeduplicateMode = DeduplicateMode.BY_ALL_COLUMNS
|
||||
deduplicate_ignore_columns = []
|
||||
|
||||
def scrape(self) -> list[dict]:
|
||||
"""Use yt-dlp to fetch the list of favorited videos. This is a placeholder for invoking yt-dlp and parsing its output."""
|
||||
result = subprocess.run(
|
||||
[
|
||||
'yt-dlp',
|
||||
'--flat-playlist',
|
||||
'--dump-json',
|
||||
f'https://www.youtube.com/playlist?list={PLAYLIST_ID}',
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f'Non-zero returncode in command: {result.returncode}\n\n{result.stderr}')
|
||||
|
||||
for line in result.stdout.splitlines():
|
||||
data = json.loads(line)
|
||||
data['thumbnail'] = data['thumbnails'][-1]['url']
|
||||
safe_del(data, '_type', '_version', 'thumbnails')
|
||||
yield data
|
|
@ -10,6 +10,7 @@ CRUNCHYROLL_AUTH = secrets.load('CRUNCHYROLL_AUTH')
|
|||
# FFXIV
|
||||
FFXIV_CHARACTER_ID = secrets.load('FFXIV_CHARACTER_ID')
|
||||
|
||||
|
||||
# Playstation
|
||||
def playstation_psn_id():
|
||||
return secrets.load_or_fail('PLAYSTATION_PSN_ID')
|
||||
|
@ -24,6 +25,7 @@ def pbc_account_address():
|
|||
def steam_username():
|
||||
return secrets.load_or_fail('STEAM_USERNAME')
|
||||
|
||||
|
||||
# Gitea
|
||||
def gitea_access_token():
|
||||
return secrets.load('GITEA_ACCESS_TOKEN')
|
||||
|
|
|
@ -1,10 +1,8 @@
|
|||
import _csv
|
||||
import csv
|
||||
import dataclasses
|
||||
import datetime
|
||||
import io
|
||||
import logging
|
||||
import urllib.parse
|
||||
from collections.abc import Iterable, Mapping
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
@ -16,6 +14,13 @@ from . import csv_import, data
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def safe_del(d: dict, *keys: str):
|
||||
for key in keys:
|
||||
if key in d:
|
||||
del d[key]
|
||||
|
||||
|
||||
|
||||
def equals_without_fields(
|
||||
a: Mapping[str, Any],
|
||||
b: Mapping[str, Any],
|
||||
|
@ -98,7 +103,10 @@ def normalize_dict(d: dict[str, Any] | frozendict[str, Any]) -> frozendict[str,
|
|||
if not isinstance(d, dict) and not isinstance(d, frozendict):
|
||||
d = dataclass_to_dict(d)
|
||||
assert isinstance(d, dict) or isinstance(d, frozendict), 'Not a dict'
|
||||
safe_values = [(k, csv_import.csv_str_to_value(csv_import.csv_safe_value(v))) for k, v in d.items() ]
|
||||
safe_values = [
|
||||
(k, csv_import.csv_str_to_value(csv_import.csv_safe_value(v)))
|
||||
for k, v in d.items()
|
||||
]
|
||||
return frozendict({k: v for k, v in safe_values if v is not None})
|
||||
|
||||
|
||||
|
|
|
@ -1,17 +1,16 @@
|
|||
import sys
|
||||
import bs4
|
||||
import zipfile
|
||||
import subprocess
|
||||
import csv
|
||||
import requests
|
||||
from pathlib import Path
|
||||
import personal_data.csv_import
|
||||
import personal_data.main
|
||||
import dataclasses
|
||||
import logging
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
import bs4
|
||||
|
||||
import personal_data.csv_import
|
||||
import personal_data.main
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class Result:
|
||||
title: str
|
||||
|
@ -19,9 +18,8 @@ class Result:
|
|||
levels: str
|
||||
|
||||
|
||||
SESSION = personal_data.main.get_session(
|
||||
[],
|
||||
with_cfscrape=False, ignore_cache=False)
|
||||
SESSION = personal_data.main.get_session([], with_cfscrape=False, ignore_cache=False)
|
||||
|
||||
|
||||
def parse_results(response) -> list[Result]:
|
||||
soup = bs4.BeautifulSoup(response.text, 'lxml')
|
||||
|
@ -39,25 +37,31 @@ def parse_results(response) -> list[Result]:
|
|||
results.append(Result(title, int(id), levels))
|
||||
return results
|
||||
|
||||
|
||||
def search_for_song(song_data) -> Result | None:
|
||||
response = SESSION.post('https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php',
|
||||
response = SESSION.post(
|
||||
'https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php',
|
||||
data={
|
||||
'songtitle': song_data['song.name_eng'],
|
||||
'songartist': song_data['song.artist'],
|
||||
})
|
||||
},
|
||||
)
|
||||
if results := parse_results(response):
|
||||
return results[0]
|
||||
|
||||
response = SESSION.post('https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php',
|
||||
response = SESSION.post(
|
||||
'https://zenius-i-vanisher.com/v5.2/simfiles_search_ajax.php',
|
||||
data={
|
||||
'songtitle': song_data['song.name_eng'],
|
||||
'songartist': '',
|
||||
})
|
||||
},
|
||||
)
|
||||
if results := parse_results(response):
|
||||
return results[0]
|
||||
logger.warning('No results for %s', song_data['song.name_eng'])
|
||||
return None
|
||||
|
||||
|
||||
def download_song(song_data, output_dir: Path):
|
||||
song_result = search_for_song(song_data)
|
||||
if song_result is None:
|
||||
|
@ -75,6 +79,7 @@ def download_song(song_data, output_dir: Path):
|
|||
cmd = ['curl', '-L', '--fail', url, '-o', path_zip]
|
||||
subprocess.run(cmd, check=True, capture_output=True)
|
||||
|
||||
|
||||
def main():
|
||||
csv_path = Path('./output/myanimelist_songs.csv')
|
||||
output_path = Path('./output/songs')
|
||||
|
@ -88,5 +93,3 @@ def main():
|
|||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user