1
0

Compare commits

..

No commits in common. "ecab90985149feeaa64ba4e80bf67841acfac135" and "b648983ff2eda0c12101fa6f13a68f9c719f8ed4" have entirely different histories.

9 changed files with 49 additions and 355 deletions

View File

@ -10,15 +10,11 @@ from typing import Any
from personal_data.util import load_csv_file from personal_data.util import load_csv_file
from .obsidian import Event, ObsidianVault from .obsidian import ObsidianVault
logger = getLogger(__name__) logger = getLogger(__name__)
Row = dict[str, Any] def import_workout_csv(vault: ObsidianVault, rows: list[dict[str,Any]]) -> int:
Rows = list[Row]
def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
num_updated = 0 num_updated = 0
for row in rows: for row in rows:
date = row['Date'] date = row['Date']
@ -44,24 +40,22 @@ def import_workout_csv(vault: ObsidianVault, rows: Rows) -> int:
del row, date del row, date
return num_updated return num_updated
def import_step_counts_csv(vault: ObsidianVault, rows: list[dict[str,Any]]) -> int:
def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
MINIMUM = 300 MINIMUM = 300
num_updated = 0 num_updated = 0
rows_per_date = {} rows_per_day = {}
for row in rows: for row in rows:
date = row['Start'].date() date = row['Start'].date()
rows_per_date.setdefault(date, []) rows_per_day.setdefault(date, [])
rows_per_date[date].append(row) rows_per_day[date].append(row)
del date, row del date, row
steps_per_date = {
date: sum(row['Steps'] for row in rows) for date, rows in rows_per_date.items()
}
for date, steps in steps_per_date.items(): steps_per_day = { date: sum(row['Steps'] for row in rows) for date, rows in rows_per_day.items()}
for date, steps in steps_per_day.items():
if steps < MINIMUM: if steps < MINIMUM:
continue continue
was_updated = vault.add_statistic(date, 'Steps', steps) was_updated = vault.add_statistic(date, 'Steps', steps)
@ -71,67 +65,14 @@ def import_step_counts_csv(vault: ObsidianVault, rows: Rows) -> int:
return num_updated return num_updated
def import_watched_series_csv(vault: ObsidianVault, rows: Rows) -> int:
# TODO: Update to using git_time_tracker event parsing system
verb = 'Watched'
num_updated = 0
rows_per_date = {}
for row in rows:
date = row['me.last_played_time'].date()
rows_per_date.setdefault(date, [])
rows_per_date[date].append(row)
del date, row
del rows
def map_to_event(row: Row) -> Event:
start = (
row['me.last_played_time'].time().replace(second=0, microsecond=0, fold=0)
)
end = start
comment = '{} Episode {}: *{}*'.format(
row['season.name'],
row['episode.index'],
row['episode.name'],
)
return Event(start, end, verb, row['series.name'], comment)
for date, rows in rows_per_date.items():
events = [map_to_event(row) for row in rows]
was_updated = vault.add_events(date, events)
if was_updated:
num_updated += 1
del date, was_updated
return num_updated
def import_data(obsidian_path: Path, dry_run=True): def import_data(obsidian_path: Path, dry_run=True):
vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None) vault = ObsidianVault(obsidian_path, read_only=dry_run and 'silent' or None)
if False: #data_path = Path('/home/jmaa/Notes/workout.csv')
data_path = Path('/home/jmaa/Notes/workout.csv') data_path = Path('/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv')
rows = load_csv_file(data_path)
logger.info('Loaded CSV with %d lines', len(rows))
num_updated = import_workout_csv(vault, rows)
logger.info('Updated %d files', num_updated)
if False:
data_path = Path(
'/home/jmaa/personal-archive/misc-data/step_counts_2023-07-26_to_2024-09-21.csv',
)
rows = load_csv_file(data_path) rows = load_csv_file(data_path)
logger.info('Loaded CSV with %d lines', len(rows)) logger.info('Loaded CSV with %d lines', len(rows))
#num_updated = import_workout_csv(vault, rows)
num_updated = import_step_counts_csv(vault, rows) num_updated = import_step_counts_csv(vault, rows)
logger.info('Updated %d files', num_updated)
if True:
data_path = Path('output/show_episodes_watched.csv')
rows = load_csv_file(data_path)
logger.info('Loaded CSV with %d lines', len(rows))
rows = rows[:7]
num_updated = import_watched_series_csv(vault, rows)
logger.info('Updated %d files', num_updated) logger.info('Updated %d files', num_updated)

View File

@ -1,49 +1,17 @@
import dataclasses
import datetime import datetime
import json import json
import re
from decimal import Decimal from decimal import Decimal
from logging import getLogger from logging import getLogger
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
import frontmatter import frontmatter
import marko
import marko.md_renderer
logger = getLogger(__name__) logger = getLogger(__name__)
StatisticKey = str StatisticKey = str
@dataclasses.dataclass(frozen=True)
class Event:
start_time: datetime.time | None
end_time: datetime.time | None
verb: str
subject: str
comment: str
@dataclasses.dataclass(frozen=True)
class FileContents:
frontmatter: dict[str, Any]
blocks_pre_events: list
events: list[Event]
blocks_post_events: list
MARKDOWN_PARSER = marko.Markdown()
MARKDOWN_RENDERER = marko.md_renderer.MarkdownRenderer()
FILE_FORMAT = """
{blocks_pre_events}
## Events
{block_events}
{blocks_post_events}
"""
class ObsidianVault: class ObsidianVault:
def __init__(self, vault_path: Path, read_only: bool = 'silent'): def __init__(self, vault_path: Path, read_only: bool = 'silent'):
self.vault_path = vault_path self.vault_path = vault_path
@ -58,103 +26,57 @@ class ObsidianVault:
self.read_only = read_only self.read_only = read_only
def get_statistic( def get_statistic(
self, self, date: datetime.date, statistic_key: StatisticKey,
date: datetime.date,
statistic_key: StatisticKey,
) -> Any | None: ) -> Any | None:
if contents := self._get_date_contents(date): try:
return contents.frontmatter.get(statistic_key) with open(self._date_file_path(date)) as f:
data = frontmatter.load(f)
except FileNotFoundError:
return None return None
def add_statistic( return data.metadata.get(statistic_key)
self,
date: datetime.date,
statistic_key: StatisticKey,
amount: Any,
) -> bool:
# Adjust arguments
if isinstance(amount, Decimal):
amount = float(amount)
# Check for silent def add_statistic(
self, date: datetime.date, statistic_key: StatisticKey, amount: Any,
) -> bool:
if self.read_only == 'silent': if self.read_only == 'silent':
logger.info( logger.info(
'Read-only ObsidianVault ignoring add_statistic(%s, "%s", %s)', 'Real only ObsidianVault ignoring add_statistic(%s, "%s", %s)',
date, date,
statistic_key, statistic_key,
amount, amount,
) )
return False return False
# Load contents
self._create_date_if_not_present(date) self._create_date_if_not_present(date)
contents = self._get_date_contents(date)
# Update contents with open(self._date_file_path(date)) as f:
if contents.frontmatter.get(statistic_key) == amount: data = frontmatter.load(f)
if isinstance(amount, Decimal):
amount = float(amount)
if data.metadata.get(statistic_key) == amount:
return False return False
contents.frontmatter[statistic_key] = amount data.metadata[statistic_key] = amount
if amount is None: if amount is None:
del contents.frontmatter[statistic_key] del data.metadata[statistic_key]
with open(self._date_file_path(date), 'wb') as f:
frontmatter.dump(data, f)
# Save contents
self._save_contents(date, contents)
return True return True
def add_events(self, date: datetime.date, events: list[Event]) -> bool: def add_event(self, date: datetime.date, verb: str, subject: str) -> None:
if self.read_only == 'silent': if self.read_only == 'silent':
logger.info( logger.info(
'Read-only ObsidianVault ignoring add_event(%s, "%s", ?)', 'Real only ObsidianVault ignoring add_event(%s, "%s", ?)', date, verb,
date,
events,
) )
return False return
self._create_date_if_not_present(date) self._create_date_if_not_present(date)
contents = self._get_date_contents(date) # TODO
contents.events.extend(events)
self._save_contents(date, contents)
return True
def get_events(self, date: datetime.date) -> list[Event]:
contents = self._get_date_contents(date)
if contents is None:
return []
return contents.events
def _get_date_contents(self, date: datetime.date) -> FileContents | None:
try:
with open(self._date_file_path(date)) as f:
file_frontmatter = frontmatter.load(f)
except FileNotFoundError:
return None
ast = MARKDOWN_PARSER.parse(str(file_frontmatter))
(pre_events, list_block_items, post_events) = find_events_list_block(ast)
events = [parse_event_string(list_item) for list_item in list_block_items]
return FileContents(file_frontmatter.metadata, pre_events, events, post_events)
def _save_contents(self, date: datetime.date, contents: FileContents) -> None:
logger.info('Formatting file "%s"', date)
blocks_pre_events = ''.join(
MARKDOWN_RENDERER.render(b) for b in contents.blocks_pre_events
)
blocks_post_events = ''.join(
MARKDOWN_RENDERER.render(b) for b in contents.blocks_post_events
)
block_events = '\n'.join(
'- ' + format_event_string(e) for e in unique(contents.events)
)
text = FILE_FORMAT.format(
blocks_pre_events=blocks_pre_events,
blocks_post_events=blocks_post_events,
block_events=block_events,
).strip()
logger.info('Saving file "%s"', date)
with open(self._date_file_path(date), 'wb') as f:
frontmatter.dump(frontmatter.Post(text, **contents.frontmatter), f)
def _create_date_if_not_present(self, date: datetime.date): def _create_date_if_not_present(self, date: datetime.date):
date_file = self._date_file_path(date) date_file = self._date_file_path(date)
@ -176,62 +98,3 @@ class ObsidianVault:
def _daily_template_path(self): def _daily_template_path(self):
return (self.vault_path / self.template_file_path).with_suffix('.md') return (self.vault_path / self.template_file_path).with_suffix('.md')
def find_events_list_block(ast) -> tuple[list, list[str], list]:
blocks = ast.children
for block_i, block in enumerate(blocks):
if (
isinstance(block, marko.block.Heading)
and block.children[0].children.lower() == 'events'
):
events_block = ast.children[block_i + 1]
if isinstance(events_block, marko.block.List):
offset = 2
event_texts = [
MARKDOWN_RENDERER.render_children(li).strip()
for li in events_block.children
]
else:
offset = 1
event_texts = []
return (blocks[:block_i], event_texts, blocks[block_i + offset :])
return (blocks, [], [])
def format_event_string(event: Event) -> str:
assert event is not None
if (
event.start_time is None
and event.end_time is None
and event.subject is None
and event.verb is None
):
return event.comment
return f'{event.start_time:%H:%M} | {event.verb} [[{event.subject}]]. {event.comment}'.strip()
RE_TIME = r'(\d\d:\d\d(?::\d\d(?:\.\d+?))?)'
def parse_event_string(event_str: str) -> Event:
if m := re.match(
r'^\s*' + RE_TIME + r'[ :\|-]*(\w+ed)\s+\[([^\]]*)\]\([^)]*\)\.?\s*(.*)$',
event_str,
):
start = datetime.time.fromisoformat(m.group(1))
return Event(start, start, m.group(2), m.group(3), m.group(4))
if m := re.match(
r'^\s*' + RE_TIME + '[ :\|-]*(\w+ed)\s+\[\[([^\]]*)\]\]\.?\s*(.*)$',
event_str,
):
start = datetime.time.fromisoformat(m.group(1))
return Event(start, start, m.group(2), m.group(3), m.group(4))
logger.info('Could not parse format: %s', event_str)
return Event(None, None, None, None, event_str)
def unique(ls: list) -> list:
return list(dict.fromkeys(ls))

View File

@ -53,8 +53,7 @@ class LodestoneAchievementScraper(Scraper):
).group(1) ).group(1)
time_acquired = int(time_acquired) time_acquired = int(time_acquired)
time_acquired = datetime.datetime.fromtimestamp( time_acquired = datetime.datetime.fromtimestamp(
time_acquired, time_acquired, tz=datetime.UTC,
tz=datetime.UTC,
) )
trophy_desc = ( trophy_desc = (
entry.select_one('.entry__activity__txt').get_text().strip() entry.select_one('.entry__activity__txt').get_text().strip()

View File

@ -51,18 +51,13 @@ class JellyfinWatchHistoryScraper(Scraper):
client = JellyfinClient() client = JellyfinClient()
client.config.app( client.config.app(
'personal_data', 'personal_data', _version.__version__, 'test_machine', 'unique_id_1',
_version.__version__,
'test_machine',
'unique_id_1',
) )
client.config.data['auth.ssl'] = False client.config.data['auth.ssl'] = False
client.auth.connect_to_address(secrets.JELLYFIN_URL) client.auth.connect_to_address(secrets.JELLYFIN_URL)
client.auth.login( client.auth.login(
secrets.JELLYFIN_URL, secrets.JELLYFIN_URL, secrets.JELLYFIN_USERNAME, secrets.JELLYFIN_PASSWORD,
secrets.JELLYFIN_USERNAME,
secrets.JELLYFIN_PASSWORD,
) )
for series_data in iterate_series(client): for series_data in iterate_series(client):

View File

@ -61,8 +61,7 @@ class SteamAchievementScraper(Scraper):
soup = bs4.BeautifulSoup(response.content, 'lxml') soup = bs4.BeautifulSoup(response.content, 'lxml')
game_name: str = re.match( game_name: str = re.match(
r'Steam Community :: (.+) :: .*', r'Steam Community :: (.+) :: .*', soup.head.title.get_text(),
soup.head.title.get_text(),
).group(1) ).group(1)
soup = html_util.normalize_soup_slightly( soup = html_util.normalize_soup_slightly(

View File

@ -1,95 +0,0 @@
"""Withings API fetcher.
Supports downloading activity summary from the [Withings
API](https://developer.withings.com/api-reference/) using the [non-official
Withings API Python Client](https://pypi.org/project/withings-api/).
"""
import dataclasses
import datetime
import logging
import pickle
import subprocess
from pathlib import Path
import withings_api
from withings_api.common import CredentialsType
from personal_data import secrets
from personal_data.data import DeduplicateMode, Scraper
logger = logging.getLogger(__name__)
CREDENTIALS_FILE = Path('secrets/withings_oath_creds')
def save_credentials(credentials: CredentialsType) -> None:
"""Save credentials to a file."""
logger.info('Saving credentials in: %s', CREDENTIALS_FILE)
with open(CREDENTIALS_FILE, 'wb') as file_handle:
pickle.dump(credentials, file_handle)
def load_credentials() -> CredentialsType:
"""Load credentials from a file."""
logger.info('Using credentials saved in: %s', CREDENTIALS_FILE)
try:
with open(CREDENTIALS_FILE, 'rb') as file_handle:
return pickle.load(file_handle)
except FileNotFoundError:
return None
@dataclasses.dataclass(frozen=True)
class WithingsActivityScraper(Scraper):
dataset_name = 'withings_activity'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
@staticmethod
def requires_cfscrape() -> bool:
return False
def oauth_flow(self) -> CredentialsType:
if creds := load_credentials():
return creds
auth = withings_api.WithingsAuth(
client_id=secrets.WITHINGS_CLIENTID,
consumer_secret=secrets.WITHINGS_SECRET,
callback_uri=secrets.WITHINGS_CALLBACK_URI,
scope=(
withings_api.AuthScope.USER_ACTIVITY,
withings_api.AuthScope.USER_METRICS,
withings_api.AuthScope.USER_INFO,
withings_api.AuthScope.USER_SLEEP_EVENTS,
),
)
authorize_url = auth.get_authorize_url()
subprocess.run(['firefox', '--new-tab', authorize_url])
credentials_code = input('Please insert your code here: ').strip()
creds = auth.get_credentials(credentials_code)
save_credentials(creds)
return creds
def scrape(self):
credentials = self.oauth_flow()
# Now you are ready to make calls for data.
api = withings_api.WithingsApi(credentials)
start = datetime.date.today() - datetime.timedelta(days=200)
end = datetime.date.today()
activity_result = api.measure_get_activity(
startdateymd=start,
enddateymd=end,
)
for activity in activity_result.activities:
sample = dict(activity)
sample['date'] = activity.date.date()
del sample['timezone'], sample['is_tracker']
yield sample
del activity, sample

View File

@ -61,9 +61,7 @@ def get_session(
if cfscrape: if cfscrape:
session_class = CachedCfScrape session_class = CachedCfScrape
session = session_class( session = session_class(
OUTPUT_PATH / 'web_cache', OUTPUT_PATH / 'web_cache', cookies=cookiejar, expire_after=CACHE_EXPIRE_DEFAULT,
cookies=cookiejar,
expire_after=CACHE_EXPIRE_DEFAULT,
) )
for cookie in cookiejar: for cookie in cookiejar:
session.cookies.set_cookie(cookie) session.cookies.set_cookie(cookie)

View File

@ -40,8 +40,3 @@ MAILGUN_RECIPIENT = load_secret('MAILGUN_RECIPIENT')
JELLYFIN_URL = load_secret('JELLYFIN_URL') JELLYFIN_URL = load_secret('JELLYFIN_URL')
JELLYFIN_USERNAME = load_secret('JELLYFIN_USERNAME') JELLYFIN_USERNAME = load_secret('JELLYFIN_USERNAME')
JELLYFIN_PASSWORD = load_secret('JELLYFIN_PASSWORD') JELLYFIN_PASSWORD = load_secret('JELLYFIN_PASSWORD')
# Withings
WITHINGS_CLIENTID = load_secret('WITHINGS_CLIENTID')
WITHINGS_SECRET = load_secret('WITHINGS_SECRET')
WITHINGS_CALLBACK_URI = load_secret('WITHINGS_CALLBACK_URI')

View File

@ -1,4 +1,3 @@
import _csv
import csv import csv
import datetime import datetime
import decimal import decimal
@ -152,7 +151,7 @@ def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]:
) )
def load_csv_file(csv_file: Path) -> list[frozendict[str, typing.Any]]: def load_csv_file(csv_file: Path) -> list[frozendict]:
dicts: list[frozendict] = [] dicts: list[frozendict] = []
with open(csv_file) as csvfile: with open(csv_file) as csvfile:
dialect = csv.Sniffer().sniff(csvfile.read(1024)) dialect = csv.Sniffer().sniff(csvfile.read(1024))
@ -181,7 +180,7 @@ def extend_csv_file(
try: try:
dicts = load_csv_file(csv_file) dicts = load_csv_file(csv_file)
except (FileNotFoundError, _csv.Error) as e: except FileNotFoundError as e:
logger.info('Creating file: %s', csv_file) logger.info('Creating file: %s', csv_file)
dicts = [] dicts = []