1
0

Compare commits

..

4 Commits

Author SHA1 Message Date
7ab46bc48e
Improved title
Some checks failed
Test Python / Test (push) Failing after 27s
2024-08-26 00:37:51 +02:00
29c6723867
iCalendar output format 2024-08-26 00:26:04 +02:00
0d4d7bad12
Filtering and heuristic is now done outside of output format 2024-08-26 00:14:12 +02:00
927b603f27
Moved output formats into own submodule 2024-08-25 23:48:08 +02:00
8 changed files with 224 additions and 88 deletions

View File

@ -32,105 +32,57 @@ import sys
from collections.abc import Iterator
from pathlib import Path
from .data import HIDDEN_LABEL_PREFIX, HIDDEN_LABEL_TOTAL, WorkSample
from .data import HIDDEN_LABEL_PREFIX, HIDDEN_LABEL_TOTAL, WorkSample, RealizedWorkSample
from .format import cli, icalendar
from .source import git_repo
logger = logging.getLogger(__name__)
DEFAULT_EST_TIME = datetime.timedelta(hours=1)
DEFAULT_ESTIMATED_DURATION = datetime.timedelta(hours=1)
ZERO_DURATION = datetime.timedelta(seconds=0)
HOUR = datetime.timedelta(hours=1)
MINUTE = datetime.timedelta(minutes=1)
def fmt_year_ranges_internal(years: list[int]) -> Iterator[str]:
years = sorted(years)
for idx, year in enumerate(years):
at_end = idx == len(years) - 1
range_before = idx > 0 and years[idx - 1] == year - 1
range_after = not at_end and years[idx + 1] == year + 1
def filter_samples(samples: list[WorkSample], sample_filter: set[str]) -> list[WorkSample]:
assert len(sample_filter) > 0
return [s for s in samples if set(s.labels).intersection(sample_filter)]
if not range_before or not range_after:
yield str(year)
def heuristically_realize_samples(samples: list[WorkSample]) -> Iterator[RealizedWorkSample]:
"""Secret sauce.
if not at_end:
if not range_before and range_after:
yield '-'
elif not range_after:
yield ','
Guarentees that:
* No samples overlap.
"""
def fmt_year_ranges(years: list[int]) -> str:
return ''.join(list(fmt_year_ranges_internal(years)))
def fmt_line(label_type: str, label: str, total_time: datetime.timedelta) -> str:
hours = int(total_time / HOUR)
minutes = int((total_time - hours * HOUR) / MINUTE)
return f' {label_type:10} {label:40} {hours:-4d}h {minutes:-2d}m'
def generate_report(
samples: list[WorkSample], sample_filter=frozenset(),
) -> Iterator[str]:
LABEL_FILTER = {}
# Time spent per label
time_per_label: dict[str, datetime.timedelta] = {}
years_per_label: dict[str, set[int]] = {}
prev_time = datetime.datetime.fromtimestamp(0, datetime.UTC)
previous_sample_end = datetime.datetime.fromtimestamp(0, datetime.UTC)
for sample in samples:
est_time: datetime.timedelta = DEFAULT_EST_TIME
est_time = min(sample.registered_at - prev_time, est_time)
end_at = sample.end_at
if len(sample_filter) == 0:
pass
elif not set(sample.labels).intersection(sample_filter):
continue
assert previous_sample_end <= end_at, 'Iterating in incorrect order'
for label in sample.labels:
time_per_label.setdefault(label, ZERO_DURATION)
time_per_label[label] += est_time
years_per_label.setdefault(label, set()).add(sample.registered_at.year)
# TODO: Allow end_at is None
prev_time = sample.registered_at
del sample, est_time
start_at = sample.start_at
if start_at is None:
estimated_duration: datetime.timedelta = DEFAULT_ESTIMATED_DURATION
start_at = max(previous_sample_end, end_at - estimated_duration)
del estimated_duration
time_and_label = [(duration, label) for label, duration in time_per_label.items()]
time_and_label.sort(reverse=True)
#
yield '-' * 66
yield '\n'
for total_time, label_and_type in time_and_label:
if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
continue
label_type, label = label_and_type.split(':', 1)
if len(LABEL_FILTER) > 0 and label_type not in LABEL_FILTER:
continue
yield fmt_line(label_type, label, total_time)
yield ' ('
yield fmt_year_ranges(years_per_label.get(label_and_type, []))
yield ')'
yield '\n'
del label, total_time
yield '-' * 66
yield '\n'
yield fmt_line('', 'TOTAL', time_per_label.get(HIDDEN_LABEL_TOTAL, ZERO_DURATION))
yield '\n'
yield RealizedWorkSample(labels=sample.labels, end_at=end_at, start_at=start_at)
previous_sample_end = sample.end_at
del sample
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
'--git-repo', action='extend', nargs='+', type=Path, dest='repositories',
'--git-repo',
action='extend',
nargs='+',
type=Path,
dest='repositories',
)
parser.add_argument(
'--filter',
@ -140,6 +92,14 @@ def parse_arguments():
dest='sample_filter',
default=[],
)
parser.add_argument(
'--format',
action='store',
type=str,
dest='format_mode',
default='cli_report',
choices=['cli_report', 'icalendar'],
)
return parser.parse_args()
@ -148,14 +108,27 @@ def main():
args = parse_arguments()
shared_time_stamps: set[WorkSample] = set()
shared_time_stamps_set: set[WorkSample] = set()
for repo_path in args.repositories:
logger.warning('Visit %s', repo_path)
shared_time_stamps |= set(
shared_time_stamps_set |= set(
git_repo.iterate_samples_from_git_repository(repo_path),
)
shared_time_stamps = sorted(shared_time_stamps)
shared_time_stamps = sorted(shared_time_stamps_set, key = lambda s: s.end_at)
del shared_time_stamps_set
for t in generate_report(shared_time_stamps, sample_filter=args.sample_filter):
sys.stdout.write(t)
sample_filter = args.sample_filter
if len(sample_filter) != 0:
logger.warning('Filtering %s samples', len(shared_time_stamps))
shared_time_stamps = filter_samples(shared_time_stamps, sample_filter)
logger.warning('Filtered down to %s samples', len(shared_time_stamps))
logger.warning('Realizing %s samples', len(shared_time_stamps))
shared_time_stamps = list(heuristically_realize_samples(shared_time_stamps))
if args.format_mode == 'cli_report':
for t in cli.generate_report(shared_time_stamps):
sys.stdout.write(t)
elif args.format_mode == 'icalendar':
icalendar.generate_icalendar_file(shared_time_stamps, file='./output/samples.ics')

View File

@ -8,5 +8,11 @@ HIDDEN_LABEL_TOTAL = HIDDEN_LABEL_PREFIX + 'TOTAL'
@dataclasses.dataclass(frozen=True, order=True)
class WorkSample:
registered_at: datetime.datetime
labels: Sequence[str]
start_at: datetime.datetime | None
end_at: datetime.datetime | None
@dataclasses.dataclass(frozen=True, order=True)
class RealizedWorkSample(WorkSample):
start_at: datetime.datetime
end_at: datetime.datetime

View File

@ -0,0 +1 @@
"""Submodule containing output formats."""

View File

@ -0,0 +1,77 @@
import datetime
from collections.abc import Iterator
from ..data import HIDDEN_LABEL_PREFIX, HIDDEN_LABEL_TOTAL, RealizedWorkSample
ZERO_DURATION = datetime.timedelta(seconds=0)
HOUR = datetime.timedelta(hours=1)
MINUTE = datetime.timedelta(minutes=1)
def fmt_year_ranges_internal(years: list[int]) -> Iterator[str]:
years = sorted(years)
for idx, year in enumerate(years):
at_end = idx == len(years) - 1
range_before = idx > 0 and years[idx - 1] == year - 1
range_after = not at_end and years[idx + 1] == year + 1
if not range_before or not range_after:
yield str(year)
if not at_end:
if not range_before and range_after:
yield '-'
elif not range_after:
yield ','
def fmt_year_ranges(years: list[int]) -> str:
return ''.join(list(fmt_year_ranges_internal(years)))
def fmt_line(label_type: str, label: str, total_time: datetime.timedelta) -> str:
hours = int(total_time / HOUR)
minutes = int((total_time - hours * HOUR) / MINUTE)
return f' {label_type:10} {label:40} {hours:-4d}h {minutes:-2d}m'
def generate_report(
samples: list[RealizedWorkSample],
) -> Iterator[str]:
# Time spent per label
time_per_label: dict[str, datetime.timedelta] = {}
years_per_label: dict[str, set[int]] = {}
for sample in samples:
duration = sample.end_at - sample.start_at
for label in sample.labels:
time_per_label.setdefault(label, ZERO_DURATION)
time_per_label[label] += duration
years_per_label.setdefault(label, set()).add(sample.end_at.year)
del sample, duration
time_and_label = [(duration, label) for label, duration in time_per_label.items()]
time_and_label.sort(reverse=True)
#
yield '-' * 66
yield '\n'
for total_time, label_and_type in time_and_label:
if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
continue
label_type, label = label_and_type.split(':', 1)
yield fmt_line(label_type, label, total_time)
yield ' ('
yield fmt_year_ranges(years_per_label.get(label_and_type, []))
yield ')'
yield '\n'
del label, total_time
yield '-' * 66
yield '\n'
yield fmt_line('', 'TOTAL', time_per_label.get(HIDDEN_LABEL_TOTAL, ZERO_DURATION))
yield '\n'

View File

@ -0,0 +1,71 @@
import datetime
from collections.abc import Iterator
import argparse
import datetime
import urllib.parse
import icalendar
from personal_data.util import load_csv_file
from ..data import HIDDEN_LABEL_PREFIX, HIDDEN_LABEL_TOTAL, RealizedWorkSample
ZERO_DURATION = datetime.timedelta(seconds=0)
HOUR = datetime.timedelta(hours=1)
MINUTE = datetime.timedelta(minutes=1)
def create_title(sample: RealizedWorkSample) -> str:
ls = []
for label_and_type in sample.labels:
if label_and_type.startswith(HIDDEN_LABEL_PREFIX):
continue
if label_and_type.startswith('author:'):
continue
ls.append(label_and_type)
return ' '.join(ls)
def generate_calendar(
samples: list[RealizedWorkSample],
) -> icalendar.Calendar:
max_title_parts = 2
cal = icalendar.Calendar()
cal.add('prodid', '-//personal_data_calendar//example.org//')
cal.add('version', '2.0')
for sample in samples:
title = create_title(sample)
description = ''
# Create event
event = icalendar.Event()
event.add('summary', title)
event.add('description', description)
event.add('dtstart', sample.start_at)
event.add('dtend', sample.end_at)
for label_and_type in sample.labels:
if label_and_type.startswith('author:'):
event.add('organizer', 'mailto:'+label_and_type.removeprefix('author:'))
cal.add_component(event)
del event
return cal
def generate_icalendar_file(
samples: list[RealizedWorkSample],
file: str,
) -> None:
calendar = generate_calendar(samples)
with open(file, 'wb') as f:
f.write(calendar.to_ical())

View File

@ -0,0 +1 @@
"""Submodule containing input formats."""

View File

@ -37,14 +37,21 @@ def get_samples_from_project(repo: git.Repo) -> Iterator[WorkSample]:
labels = [HIDDEN_LABEL_TOTAL]
labels.append('project:' + project_name)
labels.append('author:' + commit.author.email)
authored_date = datetime.datetime.fromtimestamp(commit.authored_date, tz=datetime.UTC)
committed_date = datetime.datetime.fromtimestamp(commit.committed_date, tz=datetime.UTC)
yield WorkSample(
datetime.datetime.fromtimestamp(commit.authored_date, tz=datetime.UTC),
tuple(labels),
)
yield WorkSample(
datetime.datetime.fromtimestamp(commit.committed_date, tz=datetime.UTC),
tuple(labels),
labels = tuple(labels),
start_at = None,
end_at = authored_date,
)
if authored_date != committed_date:
yield WorkSample(
labels = tuple(labels),
start_at = None,
end_at = committed_date,
)
del labels

0
test/__init__.py Normal file
View File