1
0

Compare commits

..

No commits in common. "72be664d82a1eeac9d2e59aa2f8196e26524a803" and "4f851b21b56e75d85090a31af3ac0aae1686d393" have entirely different histories.

12 changed files with 220 additions and 293 deletions

View File

@ -32,11 +32,12 @@ import sys
from collections.abc import Iterator
from pathlib import Path
from personal_data.activity import (
ActivitySample,
RealizedActivitySample,
from .data import (
HIDDEN_LABEL_PREFIX,
HIDDEN_LABEL_TOTAL,
RealizedWorkSample,
WorkSample,
)
from .format import cli, icalendar
from .source import csv_file, git_repo
@ -50,16 +51,16 @@ MINUTE = datetime.timedelta(minutes=1)
def filter_samples(
samples: list[ActivitySample],
samples: list[WorkSample],
sample_filter: set[str],
) -> list[ActivitySample]:
) -> list[WorkSample]:
assert len(sample_filter) > 0
return [s for s in samples if set(s.labels).intersection(sample_filter)]
def heuristically_realize_samples(
samples: list[ActivitySample],
) -> Iterator[RealizedActivitySample]:
samples: list[WorkSample],
) -> Iterator[RealizedWorkSample]:
"""Secret sauce.
Guarentees that:
@ -86,9 +87,7 @@ def heuristically_realize_samples(
start_at = max(previous_sample_end, end_at - estimated_duration)
del estimated_duration
yield RealizedActivitySample(
labels=sample.labels, end_at=end_at, start_at=start_at,
)
yield RealizedWorkSample(labels=sample.labels, end_at=end_at, start_at=start_at)
previous_sample_end = sample.end_at
del sample
@ -138,8 +137,8 @@ def parse_arguments():
return parser.parse_args()
def load_samples(args) -> set[ActivitySample]:
shared_time_stamps_set: set[ActivitySample] = set()
def load_samples(args) -> set[WorkSample]:
shared_time_stamps_set: set[WorkSample] = set()
# Git repositories
for repo_path in args.repositories:

19
git_time_tracker/data.py Normal file
View File

@ -0,0 +1,19 @@
import dataclasses
import datetime
from collections.abc import Sequence
HIDDEN_LABEL_PREFIX = '__'
HIDDEN_LABEL_TOTAL = HIDDEN_LABEL_PREFIX + 'TOTAL'
@dataclasses.dataclass(frozen=True, order=True)
class WorkSample:
labels: Sequence[str]
start_at: datetime.datetime | None
end_at: datetime.datetime | None
@dataclasses.dataclass(frozen=True, order=True)
class RealizedWorkSample(WorkSample):
start_at: datetime.datetime
end_at: datetime.datetime

View File

@ -1,7 +1,7 @@
import datetime
from collections.abc import Iterator
from personal_data.activity import HIDDEN_LABEL_CATEGORY, Label, RealizedActivitySample
from ..data import HIDDEN_LABEL_PREFIX, HIDDEN_LABEL_TOTAL, RealizedWorkSample
ZERO_DURATION = datetime.timedelta(seconds=0)
HOUR = datetime.timedelta(hours=1)
@ -29,18 +29,18 @@ def fmt_year_ranges(years: list[int]) -> str:
return ''.join(list(fmt_year_ranges_internal(years)))
def fmt_line(label: Label, total_time: datetime.timedelta) -> str:
def fmt_line(label_type: str, label: str, total_time: datetime.timedelta) -> str:
hours = int(total_time / HOUR)
minutes = int((total_time - hours * HOUR) / MINUTE)
return f' {label.category:10} {label.label:40} {hours:-4d}h {minutes:-2d}m'
return f' {label_type:10} {label:40} {hours:-4d}h {minutes:-2d}m'
def generate_report(
samples: list[RealizedActivitySample],
samples: list[RealizedWorkSample],
) -> Iterator[str]:
# Time spent per label
time_per_label: dict[Label, datetime.timedelta] = {}
years_per_label: dict[Label, set[int]] = {}
time_per_label: dict[str, datetime.timedelta] = {}
years_per_label: dict[str, set[int]] = {}
for sample in samples:
duration = sample.end_at - sample.start_at
@ -57,13 +57,15 @@ def generate_report(
#
yield '-' * 66
yield '\n'
for total_time, label in time_and_label:
if label.category == HIDDEN_LABEL_CATEGORY:
for total_time, label_and_type in time_and_label:
if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
continue
yield fmt_line(label, total_time)
label_type, label = label_and_type.split(':', 1)
yield fmt_line(label_type, label, total_time)
yield ' ('
yield fmt_year_ranges(years_per_label.get(label, []))
yield fmt_year_ranges(years_per_label.get(label_and_type, []))
yield ')'
yield '\n'
del label, total_time
@ -71,6 +73,5 @@ def generate_report(
yield '-' * 66
yield '\n'
label_total = Label(HIDDEN_LABEL_CATEGORY, 'total')
yield fmt_line(label_total, time_per_label.get(label_total, ZERO_DURATION))
yield fmt_line('', 'TOTAL', time_per_label.get(HIDDEN_LABEL_TOTAL, ZERO_DURATION))
yield '\n'

View File

@ -2,28 +2,30 @@ import datetime
import icalendar
from personal_data.activity import HIDDEN_LABEL_CATEGORY, RealizedActivitySample
from ..data import HIDDEN_LABEL_PREFIX, RealizedWorkSample
ZERO_DURATION = datetime.timedelta(seconds=0)
HOUR = datetime.timedelta(hours=1)
MINUTE = datetime.timedelta(minutes=1)
def create_title(sample: RealizedActivitySample) -> tuple[str, str]:
def create_title(sample: RealizedWorkSample) -> tuple[str, str]:
ls = []
desc = []
for label in sample.labels:
if label.category in {HIDDEN_LABEL_CATEGORY, 'author'}:
for label_and_type in sample.labels:
if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
continue
if label_and_type.startswith('author:'):
continue
if len(ls) == 0:
ls.append(label.label)
ls.append(label_and_type.split(':')[1])
else:
desc.append(label.label)
desc.append(label_and_type)
return ' '.join(ls), '\n'.join(desc)
def generate_calendar(
samples: list[RealizedActivitySample],
samples: list[RealizedWorkSample],
) -> icalendar.Calendar:
max_title_parts = 2
@ -42,11 +44,11 @@ def generate_calendar(
event.add('dtstart', sample.start_at)
event.add('dtend', sample.end_at)
for label in sample.labels:
if label.category == 'author':
for label_and_type in sample.labels:
if label_and_type.startswith('author:'):
event.add(
'organizer',
'mailto:' + label.label,
'mailto:' + label_and_type.removeprefix('author:'),
)
cal.add_component(event)
@ -56,7 +58,7 @@ def generate_calendar(
def generate_icalendar_file(
samples: list[RealizedActivitySample],
samples: list[RealizedWorkSample],
file: str,
) -> None:
calendar = generate_calendar(samples)

View File

@ -1,17 +1,84 @@
from collections.abc import Iterator
from pathlib import Path
import datetime
import urllib.parse
from typing import Any
from collections.abc import Iterator
from decimal import Decimal
from pathlib import Path
import dataclasses
from personal_data.activity import ActivitySample, Label
from personal_data.csv_import import determine_possible_keys, load_csv_file, start_end
from personal_data.util import load_csv_file
from ..data import WorkSample
def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivitySample]:
@dataclasses.dataclass
class PossibleKeys:
time_start: list[str]
time_end: list[str]
duration: list[str]
name: list[str]
image: list[str]
misc: list[str]
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
# Select data
time_keys = [
k for k, v in event_data.items() if isinstance(v, datetime.date)
]
duration_keys = [
k
for k, v in event_data.items()
if isinstance(v, Decimal) and 'duration_seconds' in k
]
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
]
misc_keys = list(event_data.keys())
for k in image_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
for k in time_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
time_start_keys = [k for k in time_keys if 'start' in k.lower() ]
time_end_keys = [k for k in time_keys if 'end' in k.lower() or 'stop' in k.lower() ]
return PossibleKeys(
time_start = time_start_keys,
time_end = time_end_keys,
duration = duration_keys,
name = name_keys,
image = image_keys,
misc = misc_keys,
)
def start_end(sample: dict[str,Any], keys: PossibleKeys) -> tuple[datetime.datetime | None, datetime.datetime | None]:
if keys.time_start and keys.time_end:
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
if keys.time_start and keys.duration:
start = sample[keys.time_start[0]]
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
return (start, start + duration)
if keys.time_start:
start = sample[keys.time_start[0]]
return (start, None)
if keys.time_end:
return (None, sample[keys.time_end[0]])
return (None, None)
def iterate_samples_from_dicts(rows: list[dict[str,Any]]) -> Iterator[WorkSample]:
assert len(rows) > 0
max_title_parts = 2
if True:
event_data = rows[len(rows) // 2] # Hopefully select a useful representative.
event_data = rows[len(rows)//2] # Hopefully select a useful representative.
possible_keys = determine_possible_keys(event_data)
del event_data
@ -19,19 +86,20 @@ def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivityS
assert len(possible_keys.image) >= 0
for event_data in rows:
"""
'''
title = ': '.join(event_data[k] for k in possible_name_keys[:max_title_parts])
description = '\n\n'.join(
event_data[k] for k in possible_name_keys[max_title_parts:]
)
image = event_data[possible_keys.image[0]] if possible_keys.image else None
"""
'''
(start_at, end_at) = start_end(event_data, possible_keys)
labels = [Label(k, event_data[k]) for k in possible_keys.misc]
labels = [f'{k}:{event_data[k]}' for k in possible_keys.misc]
# Create event
yield ActivitySample(
yield WorkSample(
labels=tuple(labels),
start_at=start_at,
end_at=end_at,
@ -40,7 +108,7 @@ def iterate_samples_from_dicts(rows: list[dict[str, Any]]) -> Iterator[ActivityS
del event_data
def iterate_samples_from_csv_file(file_path: Path) -> Iterator[ActivitySample]:
def iterate_samples_from_csv_file(file_path: Path) -> Iterator[WorkSample]:
dicts = load_csv_file(file_path)
samples = list(iterate_samples_from_dicts(dicts))
assert len(samples) > 0, 'Did not found any samples'

View File

@ -5,7 +5,7 @@ from pathlib import Path
import git
from personal_data.activity import HIDDEN_LABEL_CATEGORY, ActivitySample, Label
from ..data import HIDDEN_LABEL_TOTAL, WorkSample
logger = logging.getLogger(__name__)
@ -25,7 +25,7 @@ def determine_project_name(repo: git.Repo) -> str:
return Path(repo.working_tree_dir).name
def get_samples_from_project(repo: git.Repo) -> Iterator[ActivitySample]:
def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
project_name = determine_project_name(repo)
assert project_name is not None
@ -34,9 +34,9 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[ActivitySample]:
repo.commit()
for commit in repo.iter_commits(determine_default_branch(repo)):
labels = [Label(HIDDEN_LABEL_CATEGORY, 'total')]
labels.append(Label('project', project_name))
labels.append(Label('author', commit.author.email))
labels = [HIDDEN_LABEL_TOTAL]
labels.append('project:' + project_name)
labels.append('author:' + commit.author.email)
authored_date = datetime.datetime.fromtimestamp(
commit.authored_date,
@ -47,13 +47,13 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[ActivitySample]:
tz=datetime.UTC,
)
yield ActivitySample(
yield WorkSample(
labels=tuple(labels),
start_at=None,
end_at=authored_date,
)
if authored_date != committed_date:
yield ActivitySample(
yield WorkSample(
labels=tuple(labels),
start_at=None,
end_at=committed_date,
@ -61,7 +61,7 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[ActivitySample]:
del labels
def iterate_samples_from_git_repository(repo_path: Path) -> Iterator[ActivitySample]:
def iterate_samples_from_git_repository(repo_path: Path) -> Iterator[WorkSample]:
try:
yield from get_samples_from_project(git.Repo(repo_path))
except git.exc.InvalidGitRepositoryError:

View File

@ -1,29 +0,0 @@
import dataclasses
import datetime
from collections.abc import Sequence
HIDDEN_LABEL_CATEGORY = '__'
@dataclasses.dataclass(frozen=True, order=True)
class Label:
category: str
label: str
def __post_init__(self):
assert self.category is not None
assert ':' not in self.category
assert self.label is not None
@dataclasses.dataclass(frozen=True, order=True)
class ActivitySample:
labels: Sequence[Label]
start_at: datetime.datetime | None
end_at: datetime.datetime | None
@dataclasses.dataclass(frozen=True, order=True)
class RealizedActivitySample(ActivitySample):
start_at: datetime.datetime
end_at: datetime.datetime

View File

@ -1,150 +0,0 @@
import csv
import dataclasses
import datetime
import decimal
import typing
import urllib.parse
from collections.abc import Callable
from decimal import Decimal
from pathlib import Path
from typing import Any
from frozendict import frozendict
CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
T = typing.TypeVar('T')
def try_value(fn: Callable[[str], T], s: str) -> T | None:
try:
return fn(s)
except (ValueError, decimal.InvalidOperation):
return None
def csv_str_to_value(
s: str,
) -> (
str
| Decimal
| datetime.date
| datetime.datetime
| urllib.parse.ParseResult
| bool
| None
):
assert not isinstance(s, list) # TODO?
if s is None:
return None
s = s.strip()
if len(s) == 0:
return None
if (v_decimal := try_value(Decimal, s)) is not None:
return v_decimal
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
return v_date
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
return v_datetime
if s.startswith(('http://', 'https://')):
return urllib.parse.urlparse(s)
if s.lower() == 'false':
return False
if s.lower() == 'true':
return True
if s.lower() == 'none':
return None
return s
def load_csv_file(csv_file: Path, sniff=False) -> list[frozendict[str, typing.Any]]:
dicts: list[frozendict] = []
with open(csv_file) as csvfile:
if sniff:
dialect = csv.Sniffer().sniff(csvfile.read(1024))
csvfile.seek(0)
else:
dialect = CSV_DIALECT
reader = csv.DictReader(csvfile, dialect=dialect)
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = csv_str_to_value(orig)
if row[k] is None:
del row[k]
del k, orig
dicts.append(frozendict(row))
del row
del csvfile
return dicts
@dataclasses.dataclass
class PossibleKeys:
time_start: list[str]
time_end: list[str]
duration: list[str]
name: list[str]
image: list[str]
misc: list[str]
def determine_possible_keys(event_data: dict[str, Any]) -> PossibleKeys:
# Select data
time_keys = [k for k, v in event_data.items() if isinstance(v, datetime.date)]
duration_keys = [
k
for k, v in event_data.items()
if isinstance(v, Decimal) and 'duration_seconds' in k
]
name_keys = [k for k, v in event_data.items() if isinstance(v, str)]
image_keys = [
k for k, v in event_data.items() if isinstance(v, urllib.parse.ParseResult)
]
misc_keys = list(event_data.keys())
for k in image_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
for k in time_keys:
if k in misc_keys:
misc_keys.remove(k)
del k
time_start_keys = [k for k in time_keys if 'start' in k.lower()]
time_end_keys = [
k
for k in time_keys
if 'end' in k.lower() or 'stop' in k.lower() or 'last' in k.lower()
]
return PossibleKeys(
time_start=time_start_keys,
time_end=time_end_keys,
duration=duration_keys,
name=name_keys,
image=image_keys,
misc=misc_keys,
)
def start_end(
sample: dict[str, Any], keys: PossibleKeys,
) -> tuple[datetime.datetime | None, datetime.datetime | None]:
if keys.time_start and keys.time_end:
return (sample[keys.time_start[0]], sample[keys.time_end[0]])
if keys.time_start and keys.duration:
start = sample[keys.time_start[0]]
duration = datetime.timedelta(seconds=float(sample[keys.duration[0]]))
return (start, start + duration)
if keys.time_start:
start = sample[keys.time_start[0]]
return (start, None)
if keys.time_end:
return (None, sample[keys.time_end[0]])
return (None, None)

View File

@ -57,12 +57,12 @@ def parse_time(text: str) -> datetime.datetime:
time = try_parse(text, '%d %b %Y %I:%M:%S %p')
time = time or try_parse(text, '%d %b, %Y @ %I:%M%p')
if time is None and (m := try_parse(text, '%d %b @ %I:%M%p')):
time = m.replace(year=NOW.year)
time = m.replace(year = NOW.year)
assert time is not None, 'Could not parse format'
if time.tzinfo is None:
time = time.replace(tzinfo=LOCAL_TIMEZONE)
time = time.replace(tzinfo=LOCAL_TIMEZONE )
assert time.tzinfo is not None, time
return time

View File

@ -1,22 +1,68 @@
import _csv
import csv
import datetime
import decimal
import io
import logging
import typing
import urllib.parse
from collections.abc import Iterable, Mapping, Sequence
from collections.abc import Callable, Iterable, Mapping, Sequence
from decimal import Decimal
from pathlib import Path
from typing import Any
from frozendict import frozendict
from . import csv_import, data
from . import data
logger = logging.getLogger(__name__)
CSV_DIALECT = 'one_true_dialect'
csv.register_dialect(CSV_DIALECT, lineterminator='\n', skipinitialspace=True)
def csv_safe_value(v: Any) -> str:
T = typing.TypeVar('T')
def try_value(fn: Callable[[str], T], s: str) -> T | None:
try:
return fn(s)
except (ValueError, decimal.InvalidOperation):
return None
def csv_str_to_value(
s: str,
) -> (
str
| Decimal
| datetime.date
| datetime.datetime
| urllib.parse.ParseResult
| bool
| None
):
if s is None:
return None
s = s.strip()
if len(s) == 0:
return None
if (v_decimal := try_value(Decimal, s)) is not None:
return v_decimal
if (v_date := try_value(datetime.date.fromisoformat, s)) is not None:
return v_date
if (v_datetime := try_value(datetime.datetime.fromisoformat, s)) is not None:
return v_datetime
if s.startswith(('http://', 'https://')):
return urllib.parse.urlparse(s)
if s.lower() == 'false':
return False
if s.lower() == 'true':
return True
if s.lower() == 'none':
return None
return s
def csv_safe_value(v: object) -> str:
if isinstance(v, urllib.parse.ParseResult):
return v.geturl()
if isinstance(v, datetime.datetime):
@ -99,13 +145,32 @@ def deduplicate_dicts(
def normalize_dict(d: dict[str, typing.Any]) -> frozendict[str, typing.Any]:
return frozendict(
{
k: csv_import.csv_str_to_value(str(v))
k: csv_str_to_value(str(v))
for k, v in d.items()
if csv_import.csv_str_to_value(str(v)) is not None
if csv_str_to_value(str(v)) is not None
},
)
def load_csv_file(csv_file: Path) -> list[frozendict[str, typing.Any]]:
dicts: list[frozendict] = []
with open(csv_file) as csvfile:
dialect = csv.Sniffer().sniff(csvfile.read(1024))
csvfile.seek(0)
reader = csv.DictReader(csvfile, dialect=dialect)
for row in reader:
for k in list(row.keys()):
orig = row[k]
row[k] = csv_str_to_value(orig)
if row[k] is None:
del row[k]
del k, orig
dicts.append(frozendict(row))
del row
del csvfile
return dicts
def extend_csv_file(
csv_file: Path,
new_dicts: list[dict[str, typing.Any]],
@ -115,7 +180,7 @@ def extend_csv_file(
assert isinstance(deduplicate_ignore_columns, list), deduplicate_ignore_columns
try:
dicts = csv_import.load_csv_file(csv_file)
dicts = load_csv_file(csv_file)
except (FileNotFoundError, _csv.Error) as e:
logger.info('Creating file: %s', csv_file)
dicts = []
@ -134,7 +199,7 @@ def extend_csv_file(
writer = csv.DictWriter(
csvfile_in_memory,
fieldnames=fieldnames,
dialect=csv_import.CSV_DIALECT,
dialect=CSV_DIALECT,
)
writer.writeheader()
for d in dicts:

View File

@ -1,24 +0,0 @@
import datetime
import frozendict
from personal_data.csv_import import determine_possible_keys
def test_determine_possible_keys():
data = frozendict.frozendict(
{
'game.name': 'Halo',
'me.last_played_time': datetime.datetime(
2021, 6, 13, 19, 12, 21, tzinfo=datetime.timezone.utc,
),
'trophy.name': 'Test',
'trophy.desc': 'Description',
},
)
keys = determine_possible_keys(data)
assert keys.time_start == []
assert keys.time_end == ['me.last_played_time']
assert keys.duration == []
assert len(keys.name) == 3

View File

@ -3,37 +3,13 @@ from decimal import Decimal
import pytest
from personal_data.csv_import import csv_str_to_value
from personal_data.util import csv_str_to_value
PARSE_MAPPINGS = [
(
'2024-04-28 21:35:40+00:00',
datetime.datetime(2024, 4, 28, 21, 35, 40, tzinfo=datetime.UTC),
),
(
'2024-07-06 19:30:11+02:00',
datetime.datetime(
2024,
7,
6,
19,
30,
11,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
),
),
(
'2023-10-21 11:43:27+02:00',
datetime.datetime(
2023,
10,
21,
11,
43,
27,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
),
),
(
'0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38',
'0003791e9f5f3691b8bbbe0d12a7ae9c3f2e89db38',