1
0

Compare commits

..

No commits in common. "acaedcbd3ab3c312f9eb185d3c631adcc18475b3" and "088cac75fce0c03d155db00298fce2899cfd893a" have entirely different histories.

11 changed files with 45 additions and 95 deletions

View File

@ -57,6 +57,8 @@ def generate_report(
time_and_label = [(duration, label) for label, duration in time_per_label.items()]
time_and_label.sort(reverse=True)
#
yield '-' * LINE_LENGTH
yield '\n'
for total_time, label in time_and_label:

View File

@ -25,6 +25,8 @@ def create_title(sample: RealizedActivitySample) -> tuple[str, str]:
def generate_calendar(
samples: list[RealizedActivitySample],
) -> icalendar.Calendar:
max_title_parts = 2
cal = icalendar.Calendar()
cal.add('prodid', '-//personal_data_calendar//example.org//')
cal.add('version', '2.0')

View File

@ -10,9 +10,8 @@ logger = getLogger(__name__)
def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivitySample]:
if len(rows) == 0:
message = 'No rows provided'
raise ValueError(message)
assert len(rows) > 0
max_title_parts = 2
if True:
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
@ -20,9 +19,8 @@ def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivityS
logger.info('Found possible keys: %s', possible_keys)
del event_data
if len(possible_keys.time_start) + len(possible_keys.time_end) < 1:
message = 'No time columns found in data'
raise ValueError(message)
assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1
assert len(possible_keys.image) >= 0
for event_data in rows:
"""
@ -49,7 +47,5 @@ def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivityS
def iterate_samples_from_csv_file(file_path: Path) -> Iterator[ActivitySample]:
dicts = load_csv_file(file_path)
samples = list(iterate_samples_from_dicts(dicts))
if len(samples) == 0:
message = 'Did not find any samples'
raise ValueError(message)
assert len(samples) > 0, 'Did not found any samples'
yield from samples

View File

@ -13,10 +13,9 @@ logger = logging.getLogger(__name__)
def determine_default_branch(repo: git.Repo):
try:
repo.commit('main')
except Exception:
return 'master'
else:
return 'main'
except:
return 'master'
def determine_project_name(repo: git.Repo) -> str:
@ -28,9 +27,7 @@ def determine_project_name(repo: git.Repo) -> str:
def get_samples_from_project(repo: git.Repo) -> Iterator[ActivitySample]:
project_name = determine_project_name(repo)
if project_name is None:
message = 'Could not determine project name'
raise ValueError(message)
assert project_name is not None
# TODO: Branch on main or master or default

View File

@ -1 +1 @@
"""HTML-Data Formatting."""
"""# HTML-Data Formatting"""

View File

@ -13,7 +13,11 @@ ROOT_DIRECTORY = Path('output')
@bottle.route('/<csv_type>/newest')
def newest_entry(csv_type: str):
"""Loads a CSV file and finds the newest entry based on 'time.current' column, returns as JSON."""
"""
Loads a CSV file (default: data.csv, overridable by query param 'file'),
finds the newest entry based on the 'time.current' column, and returns it as JSON.
"""
path = ROOT_DIRECTORY / f'{csv_type}.csv'
bottle.response.content_type = 'application/json'
@ -29,10 +33,10 @@ def newest_entry(csv_type: str):
bottle.response.status = 404
return {'error': 'CSV file is empty or no data found'}
time_column = 'time.current'
TIME_COLUMN = 'time.current'
if time_column in data[0]:
newest = max(data, key=lambda r: r.get(time_column))
if TIME_COLUMN in data[0]:
newest = max(data, key=lambda r: r.get(TIME_COLUMN))
else:
newest = data[-1]

View File

@ -39,19 +39,18 @@ def to_text_duration(duration: datetime.timedelta) -> str:
duration -= minutes * MINUTE
seconds = int(duration / SECOND)
components = []
l = []
if hours > 0:
components.append(f'{hours} hours')
l.append(f'{hours} hours')
if minutes > 0:
components.append(f'{minutes} minutes')
l.append(f'{minutes} minutes')
if seconds > 0:
components.append(f'{seconds} seconds')
return ' '.join(components)
l.append(f'{seconds} seconds')
return ' '.join(l)
def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
if len(rows) == 0:
raise ValueError("No rows provided for sample iteration")
assert len(rows) > 0
if True:
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
@ -59,10 +58,8 @@ def iterate_samples_from_rows(rows: Rows) -> Iterator[ActivitySample]:
logger.info('Found possible keys: %s', possible_keys)
del event_data
if len(possible_keys.time_start) + len(possible_keys.time_end) < 1:
raise ValueError("No time start or end keys found in data")
if len(possible_keys.image) < 0:
raise ValueError("Invalid number of image keys found")
assert len(possible_keys.time_start) + len(possible_keys.time_end) >= 1
assert len(possible_keys.image) >= 0
for event_data in rows:
(start_at, end_at) = start_end(event_data, possible_keys)
@ -145,10 +142,10 @@ def import_stepmania_steps_csv(vault: ObsidianVault, rows: Rows) -> int:
rows_per_date[date].append(row)
del date, row
columns = ['score.w1', 'score.w2', 'score.w3', 'score.w4', 'score.w5']
COLUMNS = ['score.w1', 'score.w2', 'score.w3', 'score.w4', 'score.w5']
def all_steps(row: dict[str, int]):
return sum(row[column] for column in columns)
return sum(row[column] for column in COLUMNS)
steps_per_date = {
date: sum(all_steps(row) for row in rows)

View File

@ -1,46 +0,0 @@
"""OpenScale SQLite database fetcher.
Reads weight measurements from an OpenScale backup SQLite database.
OpenScale is an open-source weight tracking app for Android.
"""
import dataclasses
import datetime
import sqlite3
from pathlib import Path
from personal_data.data import DeduplicateMode, Scraper
DATABASE_PATH = '/home/jmaa/Notes/Rawbackupdata/ScaleWeight/2025-06-24_openScale.db'
@dataclasses.dataclass(frozen=True)
class OpenScale(Scraper):
dataset_name = 'openscale_measurements'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
@staticmethod
def requires_cfscrape() -> bool:
return False
def scrape(self):
"""Read weight measurements from OpenScale SQLite database."""
db_path = Path(DATABASE_PATH)
if not db_path.exists():
msg = f'OpenScale database not found at {DATABASE_PATH}'
raise FileNotFoundError(msg)
with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT datetime, weight
FROM scaleMeasurements
ORDER BY datetime
""")
for row in cursor.fetchall():
timestamp_ms = row['datetime']
dt = datetime.datetime.fromtimestamp(timestamp_ms / 1000, tz=datetime.timezone.utc)
yield {'datetime': dt, 'weight': row['weight']}

View File

@ -11,8 +11,6 @@ from . import data, fetchers, notification, util
logger = logging.getLogger(__name__)
MIN_COOKIES_THRESHOLD = 10
try:
import cloudscraper
except ImportError:
@ -65,7 +63,8 @@ def get_session(
)
else:
logger.error('Expected cloudscraper, but not defined!')
elif ignore_cache:
else:
if ignore_cache:
logger.warning('HTTP cache disabled')
return requests.Session()
session = session_class(
@ -99,13 +98,13 @@ def get_cookiejar(use_cookiejar: bool):
if use_cookiejar:
logger.warning('Got cookiejar from firefox')
cookiejar = browser_cookie3.firefox()
if len(cookiejar) > MIN_COOKIES_THRESHOLD:
if len(cookiejar) > 10:
return cookiejar
browser_cookie3.firefox(
'/home/jmaa/.cachy/mbui5xg7.default-release/cookies.sqlite',
)
logger.warning('Cookiejar has %s cookies', len(cookiejar))
if len(cookiejar) > MIN_COOKIES_THRESHOLD:
if len(cookiejar) > 10:
return cookiejar
logger.warning('No cookiejar is used')
return []
@ -145,7 +144,7 @@ def main(
del result
except requests.exceptions.HTTPError as e:
logger.exception('Failed in running %s', scraper_cls.__name__)
logger.exception('User-Agent: %s', e.request.headers['user-agent'])
logger.error('User-Agent: %s', e.request.headers['user-agent'])
continue
status = util.extend_csv_file(
OUTPUT_PATH / f'{scraper.dataset_name}.csv',

View File

@ -16,7 +16,8 @@ logger = logging.getLogger(__name__)
def safe_del(d: dict, *keys: str):
for key in keys:
d.pop(key, None)
if key in d:
del d[key]
def equals_without_fields(
@ -63,8 +64,7 @@ def deduplicate_dicts(
deduplicate_ignore_columns: list[str],
) -> tuple[list[frozendict[str, Any]], list[str]]:
if not isinstance(deduplicate_ignore_columns, list):
message = str(deduplicate_ignore_columns)
raise TypeError(message)
raise TypeError(deduplicate_ignore_columns)
fieldnames = []
for d in dicts:
@ -101,9 +101,7 @@ def dataclass_to_dict(obj) -> dict[str, Any]:
def normalize_dict(d: dict[str, Any] | frozendict[str, Any]) -> frozendict[str, Any]:
if not isinstance(d, dict) and not isinstance(d, frozendict):
d = dataclass_to_dict(d)
if not isinstance(d, (dict, frozendict)):
message = 'Expected dict or frozendict'
raise TypeError(message)
assert isinstance(d, dict) or isinstance(d, frozendict), 'Not a dict'
safe_values = [
(k, csv_import.csv_str_to_value(csv_import.csv_safe_value(v)))
for k, v in d.items()
@ -120,8 +118,7 @@ def extend_csv_file(
if deduplicate_ignore_columns == data.Scraper.deduplicate_ignore_columns:
deduplicate_ignore_columns = []
if not isinstance(deduplicate_ignore_columns, list):
message = str(deduplicate_ignore_columns)
raise TypeError(message)
raise TypeError(deduplicate_ignore_columns)
try:
original_dicts = csv_import.load_csv_file(csv_file)

View File

@ -66,9 +66,11 @@ def parse_version_file(text: str) -> str:
def find_python_packages() -> list[str]:
"""Find all python packages (directories containing __init__.py files)."""
"""
Find all python packages. (Directories containing __init__.py files.)
"""
root_path = Path(PACKAGE_NAME)
packages: set[str] = {PACKAGE_NAME}
packages: set[str] = set([PACKAGE_NAME])
# Search recursively
for init_file in root_path.rglob('__init__.py'):