1
0

Compare commits

..

No commits in common. "c74920e478098caed97526552dfb367e1aad4b71" and "b655759f06a9380cca4c4b29aee6d7923d617562" have entirely different histories.

15 changed files with 52 additions and 289 deletions

View File

@ -1,78 +0,0 @@
"""Obsidian Import.
Sub-module for importing time-based data into Obsidian.
"""
import datetime
from logging import getLogger
from pathlib import Path
from typing import Any
from personal_data.util import load_csv_file
from .obsidian import ObsidianVault
logger = getLogger(__name__)
def import_workout_csv(vault: ObsidianVault, rows: list[dict[str,Any]]) -> int:
num_updated = 0
for row in rows:
date = row['Date']
was_updated = False
mapping = {
'Cycling (mins)': ('Cycling (Duration)', 'minutes'),
'Cycling (kcals)': ('Cycling (kcals)', ''),
'Weight (Kg)': ('Weight (Kg)', ''),
}
for input_key, (output_key, unit) in mapping.items():
v = row.get(input_key)
if v is not None:
if unit:
v = str(v) + ' ' + unit
was_updated |= vault.add_statistic(date, output_key, v)
if input_key != output_key:
was_updated |= vault.add_statistic(date, input_key, None)
del input_key, output_key, unit, v
if was_updated:
num_updated += 1
del row, date
return num_updated
def import_step_counts_csv(vault: ObsidianVault, rows: list[dict[str,Any]]) -> int:
MINIMUM = 300
num_updated = 0
rows_per_day = {}
for row in rows:
date = row['Start'].date()
rows_per_day.setdefault(date, [])
rows_per_day[date].append(row)
del date, row
steps_per_day = { date: sum(row['Steps'] for row in rows) for date, rows in rows_per_day.items()}
for date, steps in steps_per_day.items():
if steps < MINIMUM:
continue
was_updated = vault.add_statistic(date, 'Steps', steps)
if was_updated:
num_updated += 1
del date, steps, was_updated
return num_updated
def import_data(obsidian_path: Path, dry_run=True):
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
#data_path = Path('/home/jmaa/Notes/workout.csv')
data_path = Path('/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv')
rows = load_csv_file(data_path)
logger.info('Loaded CSV with %d lines', len(rows))
#num_updated = import_workout_csv(vault, rows)
num_updated = import_step_counts_csv(vault, rows)
logger.info('Updated %d files', num_updated)

View File

@ -1,31 +0,0 @@
import argparse
import logging
from pathlib import Path
from . import import_data
logger = logging.getLogger(__name__)
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--vault', type=Path, required=True)
parser.add_argument('--yes', action='store_false', dest='dry_run')
return parser.parse_args()
def main():
# Setup logging
logging.basicConfig()
logging.getLogger('obsidian_import').setLevel('INFO')
args = parse_arguments()
if args.dry_run:
logger.warning('Dry run')
import_data(args.vault, dry_run=args.dry_run)
if args.dry_run:
logger.warning('Dry run: Use --yes to execute')
if __name__ == '__main__':
main()

View File

@ -1,100 +0,0 @@
import datetime
import json
from decimal import Decimal
from logging import getLogger
from pathlib import Path
from typing import Any
import frontmatter
logger = getLogger(__name__)
StatisticKey = str
class ObsidianVault:
def __init__(self, vault_path: Path, read_only: bool = 'silent'):
self.vault_path = vault_path
assert (self.vault_path / '.obsidian').exists(), 'Not an Obsidian Vault'
with open(self.vault_path / '.obsidian' / 'daily-notes.json') as f:
daily_notes_config = json.load(f)
self.daily_folder = daily_notes_config['folder']
self.path_format = daily_notes_config['format']
self.template_file_path = daily_notes_config['template']
self.read_only = read_only
def get_statistic(
self, date: datetime.date, statistic_key: StatisticKey,
) -> Any | None:
try:
with open(self._date_file_path(date)) as f:
data = frontmatter.load(f)
except FileNotFoundError:
return None
return data.metadata.get(statistic_key)
def add_statistic(
self, date: datetime.date, statistic_key: StatisticKey, amount: Any,
) -> bool:
if self.read_only == 'silent':
logger.info(
'Real only ObsidianVault ignoring add_statistic(%s, "%s", %s)',
date,
statistic_key,
amount,
)
return False
self._create_date_if_not_present(date)
with open(self._date_file_path(date)) as f:
data = frontmatter.load(f)
if isinstance(amount, Decimal):
amount = float(amount)
if data.metadata.get(statistic_key) == amount:
return False
data.metadata[statistic_key] = amount
if amount is None:
del data.metadata[statistic_key]
with open(self._date_file_path(date), 'wb') as f:
frontmatter.dump(data, f)
return True
def add_event(self, date: datetime.date, verb: str, subject: str) -> None:
if self.read_only == 'silent':
logger.info(
'Real only ObsidianVault ignoring add_event(%s, "%s", ?)', date, verb,
)
return
self._create_date_if_not_present(date)
# TODO
def _create_date_if_not_present(self, date: datetime.date):
date_file = self._date_file_path(date)
if date_file.exists():
return
logger.info('File "%s" doesn\'t exist, creating...', date)
with open(self._daily_template_path()) as f:
template_text = f.read()
with open(date_file, 'w') as f:
f.write(template_text)
def _date_file_path(self, date: datetime.date):
path = (
self.path_format.replace('YYYY', str(date.year))
.replace('MM', f'{date.month:02d}')
.replace('DD', f'{date.day:02d}')
)
return (self.vault_path / self.daily_folder / path).with_suffix('.md')
def _daily_template_path(self):
return (self.vault_path / self.template_file_path).with_suffix('.md')

View File

@ -1 +1 @@
__version__ = '0.1.46' __version__ = '0.1.45'

View File

@ -12,7 +12,6 @@ from .. import secrets
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_client(): def get_client():
assert secrets.KUCOIN_KEY, 'Missing secret: KUCOIN_KEY' assert secrets.KUCOIN_KEY, 'Missing secret: KUCOIN_KEY'
assert secrets.KUCOIN_SECRET, 'Missing secret: KUCOIN_SECRET' assert secrets.KUCOIN_SECRET, 'Missing secret: KUCOIN_SECRET'

View File

@ -2,11 +2,10 @@ import dataclasses
import datetime import datetime
import logging import logging
import re import re
import bs4 import bs4
from .. import html_util, parse_util, secrets
from ..data import DeduplicateMode, Scraper from ..data import DeduplicateMode, Scraper
from .. import secrets, parse_util, html_util
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -52,9 +51,7 @@ class LodestoneAchievementScraper(Scraper):
time_acquired, time_acquired,
).group(1) ).group(1)
time_acquired = int(time_acquired) time_acquired = int(time_acquired)
time_acquired = datetime.datetime.fromtimestamp( time_acquired = datetime.datetime.fromtimestamp(time_acquired,tz=datetime.UTC)
time_acquired, tz=datetime.UTC,
)
trophy_desc = ( trophy_desc = (
entry.select_one('.entry__activity__txt').get_text().strip() entry.select_one('.entry__activity__txt').get_text().strip()
) )

View File

@ -1,47 +1,42 @@
import dataclasses import dataclasses
import datetime
import logging import logging
from collections.abc import Iterator import re
import bs4
from typing import Any from typing import Any
from collections.abc import Iterator
from jellyfin_apiclient_python import JellyfinClient from jellyfin_apiclient_python import JellyfinClient
from .. import _version, secrets
from ..data import DeduplicateMode, Scraper from ..data import DeduplicateMode, Scraper
from .. import secrets, parse_util, html_util, _version
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
URL_SITE_ROOT = 'https://steamcommunity.com/' URL_SITE_ROOT = 'https://steamcommunity.com/'
URL_GAME_ACHIVEMENTS = URL_SITE_ROOT + 'id/{username}/stats/appid/{appid}' URL_GAME_ACHIVEMENTS = URL_SITE_ROOT+'id/{username}/stats/appid/{appid}'
FORMAT_DATE_HEADER = '%d/%m/%YYYY' FORMAT_DATE_HEADER = '%d/%m/%YYYY'
def iterate_series(client): def iterate_series(client):
result = client.jellyfin.user_items( result = client.jellyfin.user_items(params = {
params={
'includeItemTypes': 'Series', 'includeItemTypes': 'Series',
'parentId': 'a656b907eb3a73532e40e44b968d0225', 'parentId': 'a656b907eb3a73532e40e44b968d0225',
'userId': 'dd95c1085c1b4e83ba8e8853fbc644ab', 'userId': 'dd95c1085c1b4e83ba8e8853fbc644ab',
}, })
)
yield from result['Items'] yield from result['Items']
def iterate_watched_episodes_of_series(client, series_id: str): def iterate_watched_episodes_of_series(client, series_id: str):
result = client.jellyfin.user_items( result = client.jellyfin.user_items(params = {
params={
'filters': 'IsPlayed', 'filters': 'IsPlayed',
'recursive': True, 'recursive': True,
'includeItemTypes': 'Episode', 'includeItemTypes': 'Episode',
'parentId': series_id, 'parentId': series_id,
'userId': 'dd95c1085c1b4e83ba8e8853fbc644ab', 'userId': 'dd95c1085c1b4e83ba8e8853fbc644ab',
'fields': 'AirTime', 'fields': 'AirTime',
}, })
)
yield from result['Items'] yield from result['Items']
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class JellyfinWatchHistoryScraper(Scraper): class JellyfinWatchHistoryScraper(Scraper):
dataset_name = 'show_episodes_watched' dataset_name = 'show_episodes_watched'
@ -50,15 +45,12 @@ class JellyfinWatchHistoryScraper(Scraper):
def scrape(self) -> Iterator[dict[str, Any]]: def scrape(self) -> Iterator[dict[str, Any]]:
client = JellyfinClient() client = JellyfinClient()
client.config.app( client.config.app('personal_data', _version.__version__,
'personal_data', _version.__version__, 'test_machine', 'unique_id_1', 'test_machine', 'unique_id_1')
)
client.config.data['auth.ssl'] = False client.config.data["auth.ssl"] = False
client.auth.connect_to_address(secrets.JELLYFIN_URL) client.auth.connect_to_address(secrets.JELLYFIN_URL)
client.auth.login( client.auth.login(secrets.JELLYFIN_URL, secrets.JELLYFIN_USERNAME, secrets.JELLYFIN_PASSWORD)
secrets.JELLYFIN_URL, secrets.JELLYFIN_USERNAME, secrets.JELLYFIN_PASSWORD,
)
for series_data in iterate_series(client): for series_data in iterate_series(client):
series_id = series_data['Id'] series_id = series_data['Id']
@ -78,3 +70,4 @@ class JellyfinWatchHistoryScraper(Scraper):
del episode_data del episode_data
del series_data, series_id del series_data, series_id

View File

@ -4,13 +4,13 @@ import re
from collections.abc import Iterator from collections.abc import Iterator
import bs4 import bs4
import requests_util
import personal_data.html_util import personal_data.html_util
from personal_data import secrets from personal_data import secrets
from personal_data.data import DeduplicateMode, Scraper from personal_data.data import DeduplicateMode, Scraper
from .. import parse_util from .. import parse_util
import requests_util
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -27,7 +27,6 @@ def game_psnprofiles_id_from_url(relative_url: str) -> int:
MAX_NUMBER_GAMES_TO_PARSE = 1000 MAX_NUMBER_GAMES_TO_PARSE = 1000
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class PsnProfilesScraper(Scraper): class PsnProfilesScraper(Scraper):
dataset_name = 'games_played_playstation' dataset_name = 'games_played_playstation'
@ -58,7 +57,7 @@ class PsnProfilesScraper(Scraper):
requests_util.setup_limiter( requests_util.setup_limiter(
self.session, self.session,
URL_API_ROOT, URL_API_ROOT,
per_minute=5, per_minute = 5,
) )
def _scrape_games_overview(self) -> Iterator[dict]: def _scrape_games_overview(self) -> Iterator[dict]:

View File

@ -2,19 +2,18 @@ import dataclasses
import datetime import datetime
import logging import logging
import re import re
from collections.abc import Iterator
from typing import Any
import bs4 import bs4
from typing import Any
from collections.abc import Iterator
from .. import html_util, parse_util, secrets
from ..data import DeduplicateMode, Scraper from ..data import DeduplicateMode, Scraper
from .. import secrets, parse_util, html_util
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
URL_SITE_ROOT = 'https://steamcommunity.com/' URL_SITE_ROOT = 'https://steamcommunity.com/'
URL_GAME_ACHIVEMENTS = URL_SITE_ROOT + 'id/{username}/stats/appid/{appid}' URL_GAME_ACHIVEMENTS = URL_SITE_ROOT+'id/{username}/stats/appid/{appid}'
FORMAT_DATE_HEADER = '%d/%m/%YYYY' FORMAT_DATE_HEADER = '%d/%m/%YYYY'
@ -40,15 +39,14 @@ class SteamAchievementScraper(Scraper):
# Parse data # Parse data
soup = bs4.BeautifulSoup(response.content, 'lxml') soup = bs4.BeautifulSoup(response.content, 'lxml')
game_name: str = re.match( game_name: str = re.match(r'Steam Community :: (.+) :: Jmaa', soup.head.title.get_text()).group(1)
r'Steam Community :: (.+) :: Jmaa', soup.head.title.get_text(),
).group(1)
soup = html_util.normalize_soup_slightly( soup = html_util.normalize_soup_slightly(
soup, soup,
classes=False, classes=False,
) )
for entry in soup.select('.achieveRow'): for entry in soup.select('.achieveRow'):
trophy_name: str = entry.select_one('h3').get_text() trophy_name: str = entry.select_one('h3').get_text()
trophy_desc: str = entry.select_one('h5').get_text() trophy_desc: str = entry.select_one('h5').get_text()
@ -57,9 +55,7 @@ class SteamAchievementScraper(Scraper):
time_acquired_html: str = entry.select_one('.achieveUnlockTime') time_acquired_html: str = entry.select_one('.achieveUnlockTime')
if time_acquired_html is None: if time_acquired_html is None:
continue continue
time_acquired_text: str = ( time_acquired_text: str = time_acquired_html.get_text().strip().removeprefix('Unlocked ')
time_acquired_html.get_text().strip().removeprefix('Unlocked ')
)
time_acquired: datetime.datetime = parse_util.parse_time(time_acquired_text) time_acquired: datetime.datetime = parse_util.parse_time(time_acquired_text)
yield { yield {

View File

@ -8,15 +8,14 @@ import dataclasses
from decimal import Decimal from decimal import Decimal
import bs4 import bs4
import requests_util
import personal_data.html_util import personal_data.html_util
import personal_data.parse_util import personal_data.parse_util
from personal_data.data import DeduplicateMode, Scraper from personal_data.data import DeduplicateMode, Scraper
import requests_util
URL_API_ROOT = 'https://tavex.dk/' URL_API_ROOT = 'https://tavex.dk/'
def parse_dkk_price(dkk: str) -> Decimal: def parse_dkk_price(dkk: str) -> Decimal:
if dkk.strip() == '-': if dkk.strip() == '-':
return None return None
@ -39,7 +38,7 @@ class TavexScraperBase(Scraper):
requests_util.setup_limiter( requests_util.setup_limiter(
self.session, self.session,
URL_API_ROOT, URL_API_ROOT,
per_minute=5, per_minute = 5,
) )
def scrape(self): def scrape(self):

View File

@ -43,10 +43,8 @@ if cfscrape:
class CachedCfScrape(requests_cache.CacheMixin, cfscrape.CloudflareScraper): class CachedCfScrape(requests_cache.CacheMixin, cfscrape.CloudflareScraper):
pass pass
CACHE_EXPIRE_DEFAULT = datetime.timedelta(days=7) CACHE_EXPIRE_DEFAULT = datetime.timedelta(days=7)
def get_session( def get_session(
cookiejar: Sequence, cookiejar: Sequence,
*, *,
@ -60,9 +58,7 @@ def get_session(
return requests.Session() return requests.Session()
if cfscrape: if cfscrape:
session_class = CachedCfScrape session_class = CachedCfScrape
session = session_class( session = session_class(OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=CACHE_EXPIRE_DEFAULT)
OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=CACHE_EXPIRE_DEFAULT,
)
for cookie in cookiejar: for cookie in cookiejar:
session.cookies.set_cookie(cookie) session.cookies.set_cookie(cookie)
return session return session
@ -70,7 +66,6 @@ def get_session(
def available_scrapers() -> list[type[data.Scraper]]: def available_scrapers() -> list[type[data.Scraper]]:
from . import fetchers # noqa from . import fetchers # noqa
subclasses = [] subclasses = []
class_queue = [data.Scraper] class_queue = [data.Scraper]
while class_queue: while class_queue:

View File

@ -39,7 +39,7 @@ def parse_response_datetime(response) -> datetime.datetime:
LOCAL_TIMEZONE = datetime.datetime.now(datetime.UTC).astimezone().tzinfo LOCAL_TIMEZONE = datetime.datetime.now(datetime.UTC).astimezone().tzinfo
def try_parse(text: str, fmt: str) -> datetime.datetime | None: def try_parse(text:str, fmt:str) -> datetime.datetime | None:
try: try:
time = datetime.datetime.strptime(text, fmt) time = datetime.datetime.strptime(text, fmt)
if time.tzinfo is None: if time.tzinfo is None:

View File

@ -39,8 +39,6 @@ def csv_str_to_value(
| bool | bool
| None | None
): ):
if s is None:
return None
s = s.strip() s = s.strip()
if len(s) == 0: if len(s) == 0:
return None return None
@ -108,10 +106,10 @@ def deduplicate_by_ignoring_certain_fields(
def deduplicate_dicts( def deduplicate_dicts(
dicts: Sequence[dict[str, typing.Any] | frozendict[str, typing.Any]], dicts: Sequence[dict[str,typing.Any] | frozendict[str,typing.Any]],
deduplicate_mode: data.DeduplicateMode, deduplicate_mode: data.DeduplicateMode,
deduplicate_ignore_columns: list[str], deduplicate_ignore_columns: list[str],
) -> tuple[Sequence[dict[str, typing.Any]], list[str]]: ) -> tuple[Sequence[dict[str,typing.Any]], list[str]]:
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
fieldnames = [] fieldnames = []
@ -141,13 +139,9 @@ def deduplicate_dicts(
return dicts, fieldnames return dicts, fieldnames
def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]: def normalize_dict(d: dict[str,typing.Any]) -> frozendict[str,typing.Any]:
return frozendict( return frozendict(
{ {k: csv_str_to_value(str(v)) for k, v in d.items() if csv_str_to_value(str(v)) is not None},
k: csv_str_to_value(str(v))
for k, v in d.items()
if csv_str_to_value(str(v)) is not None
},
) )
@ -172,7 +166,7 @@ def load_csv_file(csv_file: Path) -> list[frozendict]:
def extend_csv_file( def extend_csv_file(
csv_file: Path, csv_file: Path,
new_dicts: list[dict[str, typing.Any]], new_dicts: list[dict[str,typing.Any]],
deduplicate_mode: data.DeduplicateMode, deduplicate_mode: data.DeduplicateMode,
deduplicate_ignore_columns: list[str], deduplicate_ignore_columns: list[str],
) -> dict: ) -> dict:
@ -202,7 +196,7 @@ def extend_csv_file(
) )
writer.writeheader() writer.writeheader()
for d in dicts: for d in dicts:
writable_d = {k: csv_safe_value(v) for k, v in d.items()} writable_d = {k:csv_safe_value(v) for k,v in d.items()}
writer.writerow(writable_d) writer.writerow(writable_d)
del d, writable_d del d, writable_d
output_csv = csvfile_in_memory.getvalue() output_csv = csvfile_in_memory.getvalue()

View File

@ -24,6 +24,7 @@ def generate_calendar(rows: list[dict]) -> icalendar.Calendar:
cal.add('version', '2.0') cal.add('version', '2.0')
for event_data in rows: for event_data in rows:
# Select data # Select data
possible_time_keys = [ possible_time_keys = [
k for k, v in event_data.items() if isinstance(v, datetime.date) k for k, v in event_data.items() if isinstance(v, datetime.date)
@ -40,9 +41,7 @@ def generate_calendar(rows: list[dict]) -> icalendar.Calendar:
continue continue
title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts]) title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts])
description = '\n\n'.join( description = '\n\n'.join(event_data[k] for k in possible_name_keys[max_title_parts:])
event_data[k] for k in possible_name_keys[max_title_parts:]
)
# Create event # Create event
event = icalendar.Event() event = icalendar.Event()

View File

@ -4,3 +4,4 @@ import personal_data.main
def test_available(): def test_available():
names = personal_data.main.available_scraper_names() names = personal_data.main.available_scraper_names()
assert len(names) > 0 assert len(names) > 0