1
0

Compare commits

...

4 Commits

Author SHA1 Message Date
72be664d82
Ruff
All checks were successful
Test Python / Test (push) Successful in 31s
2024-10-13 15:20:30 +02:00
477fce869d
Moved ActivitySample into personal_data 2024-10-13 15:20:18 +02:00
eb3518ba88
Renamed WorkSample to ActivitySample 2024-10-13 15:05:55 +02:00
f47daa3256
Merging git_time_tracker's advanced CSV parsing into personal_data 2024-10-13 15:04:18 +02:00
12 changed files with 292 additions and 219 deletions

View File

@ -32,12 +32,11 @@ import sys
from collections.abc import Iterator
from pathlib import Path
from .data import (
HIDDEN_LABEL_PREFIX,
HIDDEN_LABEL_TOTAL,
RealizedWorkSample,
WorkSample,
from personal_data.activity import (
ActivitySample,
RealizedActivitySample,
)
from .format import cli, icalendar
from .source import csv_file, git_repo
@ -51,16 +50,16 @@ MINUTE = datetime.timedelta(minutes=1)
def filter_samples(
samples: list[WorkSample],
samples: list[ActivitySample],
sample_filter: set[str],
) -> list[WorkSample]:
) -> list[ActivitySample]:
assert len(sample_filter) > 0
return [s for s in samples if set(s.labels).intersection(sample_filter)]
def heuristically_realize_samples(
samples: list[WorkSample],
) -> Iterator[RealizedWorkSample]:
samples: list[ActivitySample],
) -> Iterator[RealizedActivitySample]:
"""Secret sauce.
Guarentees that:
@ -87,7 +86,9 @@ def heuristically_realize_samples(
start_at = max(previous_sample_end, end_at - estimated_duration)
del estimated_duration
yield RealizedWorkSample(labels=sample.labels, end_at=end_at, start_at=start_at)
yield RealizedActivitySample(
labels=sample.labels, end_at=end_at, start_at=start_at,
)
previous_sample_end = sample.end_at
del sample
@ -137,8 +138,8 @@ def parse_arguments():
return parser.parse_args()
def load_samples(args) -> set[WorkSample]:
shared_time_stamps_set: set[WorkSample] = set()
def load_samples(args) -> set[ActivitySample]:
shared_time_stamps_set: set[ActivitySample] = set()
# Git repositories
for repo_path in args.repositories:

View File

@ -1,19 +0,0 @@
import dataclasses
import datetime
from collections.abc import Sequence
HIDDEN_LABEL_PREFIX = '__'
HIDDEN_LABEL_TOTAL = HIDDEN_LABEL_PREFIX + 'TOTAL'
@dataclasses.dataclass(frozen=True, order=True)
class WorkSample:
labels: Sequence[str]
start_at: datetime.datetime | None
end_at: datetime.datetime | None
@dataclasses.dataclass(frozen=True, order=True)
class RealizedWorkSample(WorkSample):
start_at: datetime.datetime
end_at: datetime.datetime

View File

@ -1,7 +1,7 @@
import datetime
from collections.abc import Iterator
from ..data import HIDDEN_LABEL_PREFIX, HIDDEN_LABEL_TOTAL, RealizedWorkSample
from personal_data.activity import HIDDEN_LABEL_CATEGORY, Label, RealizedActivitySample
ZERO_DURATION = datetime.timedelta(seconds=0)
HOUR = datetime.timedelta(hours=1)
@ -29,18 +29,18 @@ def fmt_year_ranges(years: list[int]) -> str:
return ''.join(list(fmt_year_ranges_internal(years)))
def fmt_line(label_type: str, label: str, total_time: datetime.timedelta) -> str:
def fmt_line(label: Label, total_time: datetime.timedelta) -> str:
hours = int(total_time / HOUR)
minutes = int((total_time - hours * HOUR) / MINUTE)
return f' {label_type:10} {label:40} {hours:-4d}h {minutes:-2d}m'
return f' {label.category:10} {label.label:40} {hours:-4d}h {minutes:-2d}m'
def generate_report(
samples: list[RealizedWorkSample],
samples: list[RealizedActivitySample],
) -> Iterator[str]:
# Time spent per label
time_per_label: dict[str, datetime.timedelta] = {}
years_per_label: dict[str, set[int]] = {}
time_per_label: dict[Label, datetime.timedelta] = {}
years_per_label: dict[Label, set[int]] = {}
for sample in samples:
duration = sample.end_at - sample.start_at
@ -57,15 +57,13 @@ def generate_report(
#
yield '-' * 66
yield '\n'
for total_time, label_and_type in time_and_label:
if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
for total_time, label in time_and_label:
if label.category == HIDDEN_LABEL_CATEGORY:
continue
label_type, label = label_and_type.split(':', 1)
yield fmt_line(label_type, label, total_time)
yield fmt_line(label, total_time)
yield ' ('
yield fmt_year_ranges(years_per_label.get(label_and_type, []))
yield fmt_year_ranges(years_per_label.get(label, []))
yield ')'
yield '\n'
del label, total_time
@ -73,5 +71,6 @@ def generate_report(
yield '-' * 66
yield '\n'
yield fmt_line('', 'TOTAL', time_per_label.get(HIDDEN_LABEL_TOTAL, ZERO_DURATION))
label_total = Label(HIDDEN_LABEL_CATEGORY, 'total')
yield fmt_line(label_total, time_per_label.get(label_total, ZERO_DURATION))
yield '\n'

View File

@ -2,30 +2,28 @@ import datetime
import icalendar
from ..data import HIDDEN_LABEL_PREFIX, RealizedWorkSample
from personal_data.activity import HIDDEN_LABEL_CATEGORY, RealizedActivitySample
ZERO_DURATION = datetime.timedelta(seconds=0)
HOUR = datetime.timedelta(hours=1)
MINUTE = datetime.timedelta(minutes=1)
def create_title(sample: RealizedWorkSample) -> tuple[str, str]:
def create_title(sample: RealizedActivitySample) -> tuple[str, str]:
ls = []
desc = []
for label_and_type in sample.labels:
if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
continue
if label_and_type.startswith('author:'):
for label in sample.labels:
if label.category in {HIDDEN_LABEL_CATEGORY, 'author'}:
continue
if len(ls) == 0:
ls.append(label_and_type.split(':')[1])
ls.append(label.label)
else:
desc.append(label_and_type)
desc.append(label.label)
return ' '.join(ls), '\n'.join(desc)
def generate_calendar(
samples: list[RealizedWorkSample],
samples: list[RealizedActivitySample],
) -> icalendar.Calendar:
max_title_parts = 2
@ -44,11 +42,11 @@ def generate_calendar(
event.add('dtstart', sample.start_at)
event.add('dtend', sample.end_at)
for label_and_type in sample.labels:
if label_and_type.startswith('author:'):
for label in sample.labels:
if label.category == 'author':
event.add(
'organizer',
'mailto:' + label_and_type.removeprefix('author:'),
'mailto:' + label.label,
)
cal.add_component(event)
@ -58,7 +56,7 @@ def generate_calendar(
def generate_icalendar_file(
samples: list[RealizedWorkSample],
samples: list[RealizedActivitySample],
file: str,
) -> None:
calendar = generate_calendar(samples)

View File

@ -1,82 +1,15 @@
import datetime
import urllib.parse
from typing import Any
from collections.abc import Iterator
from decimal import Decimal
from pathlib import Path
import dataclasses
from typing import Any
from personal_data.util import load_csv_file
from personal_data.activity import ActivitySample, Label
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
from ..data import WorkSample
@dataclasses.dataclass
class PossibleKeys:
time_start: list[str]
time_end: list[str]
duration: list[str]
name: list[str]
image: list[str]
misc: list[str]
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
# Select data
time_keys = [
k for k, v in event_data.items() if isinstance(v, datetime.date)
]
duration_keys = [
k
for k, v in event_data.items()
if isinstance(v, Decimal) and 'duration_seconds' in k
]
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
]
misc_keys = list(event_data.keys())
for k in image_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
for k in time_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
time_start_keys = [k for k in time_keys if 'start' in k.lower() ]
time_end_keys = [k for k in time_keys if 'end' in k.lower() or 'stop' in k.lower() ]
return PossibleKeys(
time_start = time_start_keys,
time_end = time_end_keys,
duration = duration_keys,
name = name_keys,
image = image_keys,
misc = misc_keys,
)
def start_end(sample: dict[str,Any], keys: PossibleKeys) -> tuple[datetime.datetime | None, datetime.datetime | None]:
if keys.time_start and keys.time_end:
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
if keys.time_start and keys.duration:
start = sample[keys.time_start[0]]
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
return (start, start + duration)
if keys.time_start:
start = sample[keys.time_start[0]]
return (start, None)
if keys.time_end:
return (None, sample[keys.time_end[0]])
return (None, None)
def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample]:
def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivitySample]:
assert len(rows) > 0
max_title_parts = 2
if True:
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
possible_keys = determine_possible_keys(event_data)
@ -86,20 +19,19 @@ def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample
assert len(possible_keys.image) >= 0
for event_data in rows:
'''
"""
title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts])
description = '\n\n'.join(
event_data[k] for k in possible_name_keys[max_title_parts:]
)
image = event_data[possible_keys.image[0]] if possible_keys.image else None
'''
"""
(start_at, end_at) = start_end(event_data, possible_keys)
labels = [f'{k}:{event_data[k]}' for k in possible_keys.misc]
labels = [Label(k, event_data[k]) for k in possible_keys.misc]
# Create event
yield WorkSample(
yield ActivitySample(
labels=tuple(labels),
start_at=start_at,
end_at=end_at,
@ -108,7 +40,7 @@ def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample
del event_data
def iterate_samples_from_csv_file(file_path: Path) -> Iterator[WorkSample]:
def iterate_samples_from_csv_file(file_path: Path) -> Iterator[ActivitySample]:
dicts = load_csv_file(file_path)
samples = list(iterate_samples_from_dicts(dicts))
assert len(samples) > 0, 'Did not found any samples'

View File

@ -5,7 +5,7 @@ from pathlib import Path
import git
from ..data import HIDDEN_LABEL_TOTAL, WorkSample
from personal_data.activity import HIDDEN_LABEL_CATEGORY, ActivitySample, Label
logger = logging.getLogger(__name__)
@ -25,7 +25,7 @@ def determine_project_name(repo: git.Repo) -> str:
return Path(repo.working_tree_dir).name
def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
def get_samples_from_project(repo: git.Repo) -> Iterator[ActivitySample]:
project_name = determine_project_name(repo)
assert project_name is not None
@ -34,9 +34,9 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
repo.commit()
for commit in repo.iter_commits(determine_default_branch(repo)):
labels = [HIDDEN_LABEL_TOTAL]
labels.append('project:' + project_name)
labels.append('author:' + commit.author.email)
labels = [Label(HIDDEN_LABEL_CATEGORY, 'total')]
labels.append(Label('project', project_name))
labels.append(Label('author', commit.author.email))
authored_date = datetime.datetime.fromtimestamp(
commit.authored_date,
@ -47,13 +47,13 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
tz=datetime.UTC,
)
yield WorkSample(
yield ActivitySample(
labels=tuple(labels),
start_at=None,
end_at=authored_date,
)
if authored_date != committed_date:
yield WorkSample(
yield ActivitySample(
labels=tuple(labels),
start_at=None,
end_at=committed_date,
@ -61,7 +61,7 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
del labels
def iterate_samples_from_git_repository(repo_path: Path) -> Iterator[WorkSample]:
def iterate_samples_from_git_repository(repo_path: Path) -> Iterator[ActivitySample]:
try:
yield from get_samples_from_project(git.Repo(repo_path))
except git.exc.InvalidGitRepositoryError:

29
personal_data/activity.py Normal file
View File

@ -0,0 +1,29 @@
import dataclasses
import datetime
from collections.abc import Sequence
HIDDEN_LABEL_CATEGORY = '__'
@dataclasses.dataclass(frozen=True, order=True)
class Label:
category: str
label: str
def __post_init__(self):
assert self.category is not None
assert ':' not in self.category
assert self.label is not None
@dataclasses.dataclass(frozen=True, order=True)
class ActivitySample:
labels: Sequence[Label]
start_at: datetime.datetime | None
end_at: datetime.datetime | None
@dataclasses.dataclass(frozen=True, order=True)
class RealizedActivitySample(ActivitySample):
start_at: datetime.datetime
end_at: datetime.datetime

150
personal_data/csv_import.py Normal file
View File

@ -0,0 +1,150 @@
import csv
import dataclasses
import datetime
import decimal
import typing
import urllib.parse
from collections.abc import Callable
from decimal import Decimal
from pathlib import Path
from typing import Any
from frozendict import frozendict
CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
T = typing.TypeVar('T')
def try_value(fn: Callable[[str], T], s: str) -> T | None:
try:
return fn(s)
except (ValueError, decimal.InvalidOperation):
return None
def csv_str_to_value(
s: str,
) -> (
str
| Decimal
| datetime.date
| datetime.datetime
| urllib.parse.ParseResult
| bool
| None
):
assert not isinstance(s, list) # TODO?
if s is None:
return None
s = s.strip()
if len(s) == 0:
return None
if (v_decimal := try_value(Decimal, s)) is not None:
return v_decimal
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
return v_date
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
return v_datetime
if s.startswith(('http://', 'https://')):
return urllib.parse.urlparse(s)
if s.lower() == 'false':
return False
if s.lower() == 'true':
return True
if s.lower() == 'none':
return None
return s
def load_csv_file(csv_file: Path, sniff=False) -> list[frozendict[str, typing.Any]]:
dicts: list[frozendict] = []
with open(csv_file) as csvfile:
if sniff:
dialect = csv.Sniffer().sniff(csvfile.read(1024))
csvfile.seek(0)
else:
dialect = CSV_DIALECT
reader = csv.DictReader(csvfile, dialect=dialect)
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = csv_str_to_value(orig)
if row[k] is None:
del row[k]
del k, orig
dicts.append(frozendict(row))
del row
del csvfile
return dicts
@dataclasses.dataclass
class PossibleKeys:
time_start: list[str]
time_end: list[str]
duration: list[str]
name: list[str]
image: list[str]
misc: list[str]
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
# Select data
time_keys = [k for k, v in event_data.items() if isinstance(v, datetime.date)]
duration_keys = [
k
for k, v in event_data.items()
if isinstance(v, Decimal) and 'duration_seconds' in k
]
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
]
misc_keys = list(event_data.keys())
for k in image_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
for k in time_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
time_start_keys = [k for k in time_keys if 'start' in k.lower()]
time_end_keys = [
k
for k in time_keys
if 'end' in k.lower() or 'stop' in k.lower() or 'last' in k.lower()
]
return PossibleKeys(
time_start=time_start_keys,
time_end=time_end_keys,
duration=duration_keys,
name=name_keys,
image=image_keys,
misc=misc_keys,
)
def start_end(
sample: dict[str, Any], keys: PossibleKeys,
) -> tuple[datetime.datetime | None, datetime.datetime | None]:
if keys.time_start and keys.time_end:
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
if keys.time_start and keys.duration:
start = sample[keys.time_start[0]]
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
return (start, start + duration)
if keys.time_start:
start = sample[keys.time_start[0]]
return (start, None)
if keys.time_end:
return (None, sample[keys.time_end[0]])
return (None, None)

View File

@ -1,68 +1,22 @@
import _csv
import csv
import datetime
import decimal
import io
import logging
import typing
import urllib.parse
from collections.abc import Callable, Iterable, Mapping, Sequence
from decimal import Decimal
from collections.abc import Iterable, Mapping, Sequence
from pathlib import Path
from typing import Any
from frozendict import frozendict
from . import data
from . import csv_import, data
logger = logging.getLogger(__name__)
CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
T = typing.TypeVar('T')
def try_value(fn: Callable[[str], T], s: str) -> T | None:
try:
return fn(s)
except (ValueError, decimal.InvalidOperation):
return None
def csv_str_to_value(
s: str,
) -> (
str
| Decimal
| datetime.date
| datetime.datetime
| urllib.parse.ParseResult
| bool
| None
):
if s is None:
return None
s = s.strip()
if len(s) == 0:
return None
if (v_decimal := try_value(Decimal, s)) is not None:
return v_decimal
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
return v_date
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
return v_datetime
if s.startswith(('http://', 'https://')):
return urllib.parse.urlparse(s)
if s.lower() == 'false':
return False
if s.lower() == 'true':
return True
if s.lower() == 'none':
return None
return s
def csv_safe_value(v: object) -> str:
def csv_safe_value(v: Any) -> str:
if isinstance(v, urllib.parse.ParseResult):
return v.geturl()
if isinstance(v, datetime.datetime):
@ -145,32 +99,13 @@ def deduplicate_dicts(
def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]:
return frozendict(
{
k: csv_str_to_value(str(v))
k: csv_import.csv_str_to_value(str(v))
for k, v in d.items()
if csv_str_to_value(str(v)) is not None
if csv_import.csv_str_to_value(str(v)) is not None
},
)
def load_csv_file(csv_file: Path) -> list[frozendict[str, typing.Any]]:
dicts: list[frozendict] = []
with open(csv_file) as csvfile:
dialect = csv.Sniffer().sniff(csvfile.read(1024))
csvfile.seek(0)
reader = csv.DictReader(csvfile, dialect=dialect)
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = csv_str_to_value(orig)
if row[k] is None:
del row[k]
del k, orig
dicts.append(frozendict(row))
del row
del csvfile
return dicts
def extend_csv_file(
csv_file: Path,
new_dicts: list[dict[str, typing.Any]],
@ -180,7 +115,7 @@ def extend_csv_file(
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
try:
dicts = load_csv_file(csv_file)
dicts = csv_import.load_csv_file(csv_file)
except (FileNotFoundError, _csv.Error) as e:
logger.info('Creating file: %s', csv_file)
dicts = []
@ -199,7 +134,7 @@ def extend_csv_file(
writer = csv.DictWriter(
csvfile_in_memory,
fieldnames=fieldnames,
dialect=CSV_DIALECT,
dialect=csv_import.CSV_DIALECT,
)
writer.writeheader()
for d in dicts:

24
test/test_csv_import.py Normal file
View File

@ -0,0 +1,24 @@
import datetime
import frozendict
from personal_data.csv_import import determine_possible_keys
def test_determine_possible_keys():
data = frozendict.frozendict(
{
'game.name': 'Halo',
'me.last_played_time': datetime.datetime(
2021, 6, 13, 19, 12, 21, tzinfo=datetime.timezone.utc,
),
'trophy.name': 'Test',
'trophy.desc': 'Description',
},
)
keys = determine_possible_keys(data)
assert keys.time_start == []
assert keys.time_end == ['me.last_played_time']
assert keys.duration == []
assert len(keys.name) == 3

View File

@ -3,13 +3,37 @@ from decimal import Decimal
import pytest
from personal_data.util import csv_str_to_value
from personal_data.csv_import import csv_str_to_value
PARSE_MAPPINGS = [
(
'2024-04-28 21:35:40+00:00',
datetime.datetime(2024, 4, 28, 21, 35, 40, tzinfo=datetime.UTC),
),
(
'2024-07-06 19:30:11+02:00',
datetime.datetime(
2024,
7,
6,
19,
30,
11,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
),
),
(
'2023-10-21 11:43:27+02:00',
datetime.datetime(
2023,
10,
21,
11,
43,
27,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
),
),
(
'0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38',
'0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38',